You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/10/15 08:49:59 UTC

[submarine] branch master updated: SUBMARINE-1051. remove hadoop dependency and profile

This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d54a09  SUBMARINE-1051. remove hadoop dependency and profile
9d54a09 is described below

commit 9d54a09f5cece9c54a8c6df3b9245a31907e1e0b
Author: Brandon Lin <fa...@gmail.com>
AuthorDate: Mon Oct 11 16:49:56 2021 +0800

    SUBMARINE-1051. remove hadoop dependency and profile
    
    ### What is this PR for?
    Just like I mentioned in ticket, I created this PR to remove hadoop dependency and profile from maven.
    
    ### What type of PR is it?
    Improvement
    
    ### Todos
    * Since profile has already been removed, so we should check the maven command in all documents to remove profile argument.
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-1051
    ### How should this be tested?
    this PR had been passed the test in submarine-test:test-k8s and submarine-test:e2e
    ### Screenshots (if appropriate)
    (the test has passed but it was still failed due to lacking file)
    <img width="1678" alt="截圖 2021-10-11 上午10 45 24" src="https://user-images.githubusercontent.com/5687317/136763304-ecca0207-4f8d-4dda-a3af-7cdadb1d9048.png">
    <img width="1679" alt="截圖 2021-10-11 下午5 03 24" src="https://user-images.githubusercontent.com/5687317/136763367-27e35570-623b-41f3-84cd-8322bd3cf3f6.png">
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions?  No
    * Does this need new documentation? No
    
    Author: Brandon Lin <fa...@gmail.com>
    
    Signed-off-by: Kevin <pi...@apache.org>
    
    Closes #773 from FatalLin/SUBMARINE-1051 and squashes the following commits:
    
    e824eb21 [Brandon Lin] remove hadoop dependency and profile
---
 pom.xml                                            | 101 +---
 submarine-client/pom.xml                           |  61 ---
 .../apache/submarine/client/cli/AbstractCli.java   |  52 --
 .../org/apache/submarine/client/cli/CliUtils.java  | 158 -------
 .../apache/submarine/client/cli/ShowJobCli.java    | 131 ------
 .../submarine/client/cli/param/Localization.java   | 137 ------
 .../client/cli/param/ParametersHolder.java         | 524 ---------------------
 .../submarine/client/cli/param/Quicklink.java      |  96 ----
 .../submarine/client/cli/param/RunParameters.java  | 111 -----
 .../client/cli/param/ShowJobParameters.java        |  25 -
 .../cli/param/runjob/MXNetRunJobParameters.java    | 236 ----------
 .../cli/param/runjob/PyTorchRunJobParameters.java  | 136 ------
 .../client/cli/param/runjob/RunJobParameters.java  | 361 --------------
 .../param/runjob/TensorFlowRunJobParameters.java   | 261 ----------
 .../client/cli/runjob/RoleParameters.java          |  88 ----
 .../submarine/client/cli/runjob/RunJobCli.java     | 371 ---------------
 .../client/cli/ShowJobCliParsingTest.java          | 104 ----
 .../submarine/client/cli/YamlConfigTestUtils.java  |  69 ---
 .../cli/runjob/RunJobCliParsingCommonTest.java     | 132 ------
 .../cli/runjob/RunJobCliParsingCommonYamlTest.java | 258 ----------
 .../runjob/RunJobCliParsingParameterizedTest.java  | 196 --------
 .../runjob/mxnet/RunJobCliParsingMXNetTest.java    | 175 -------
 .../mxnet/RunJobCliParsingMXNetYamlTest.java       | 271 -----------
 .../pytorch/RunJobCliParsingPyTorchTest.java       | 282 -----------
 .../pytorch/RunJobCliParsingPyTorchYamlTest.java   | 271 -----------
 .../tensorflow/RunJobCliParsingTensorFlowTest.java | 239 ----------
 ...nJobCliParsingTensorFlowYamlStandaloneTest.java | 206 --------
 .../RunJobCliParsingTensorFlowYamlTest.java        | 308 ------------
 submarine-commons/commons-runtime/pom.xml          | 115 -----
 .../submarine/commons/runtime/ClientContext.java   |  84 ----
 .../submarine/commons/runtime/JobMonitor.java      |  94 ----
 .../submarine/commons/runtime/JobSubmitter.java    |  42 --
 .../submarine/commons/runtime/RuntimeFactory.java  | 104 ----
 .../runtime/fs/DefaultRemoteDirectoryManager.java  | 168 -------
 .../runtime/fs/FSBasedSubmarineStorageImpl.java    | 105 -----
 .../commons/runtime/fs/RemoteDirectoryManager.java |  53 ---
 .../commons/runtime/param/BaseParameters.java      |  71 ---
 .../submarine/commons/runtime/param/Parameter.java |  45 --
 .../commons/runtime/resource/ResourceUtils.java    | 360 --------------
 .../commons/runtime/MockClientContext.java         |  44 --
 .../runtime/fs/MockRemoteDirectoryManager.java     | 212 ---------
 submarine-dist/pom.xml                             |  12 +-
 submarine-test/test-e2e/pom.xml                    |   2 +-
 43 files changed, 7 insertions(+), 6864 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3727f5a..3538d01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,7 +92,6 @@
     <npm.version>6.14.11</npm.version>
     <io.swagger.version>2.1.2</io.swagger.version>
 
-    <hadoop.common.build.dir>${basedir}/../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
     <slf4j.version>1.7.25</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <commons.logging.version>1.1.3</commons.logging.version>
@@ -137,6 +136,10 @@
     <plugin.failsafe.version>2.17</plugin.failsafe.version>
     <!--  embedded-ldap-junit  -->
     <embedded-ldap-junit.version>0.7</embedded-ldap-junit.version>
+    <jaxb-api.version>2.2.11</jaxb-api.version>
+    <commons-compress.version>1.4.1</commons-compress.version>
+    <guice-servlet.version>3.0</guice-servlet.version>
+    <guice.version>3.0</guice.version>
   </properties>
 
   <modules>
@@ -192,37 +195,6 @@
         <version>${k8s.client-java.version}</version>
       </dependency>
 
-      <!--  Submarine on YARN  -->
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-services-api</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-common</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-api</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-common</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-client</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-hdfs</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
@@ -283,11 +255,7 @@
         <artifactId>commons-lang</artifactId>
         <version>${commons-lang.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.zookeeper</groupId>
-        <artifactId>zookeeper</artifactId>
-        <version>${zookeeper.version}</version>
-      </dependency>
+
       <dependency>
         <groupId>commons-httpclient</groupId>
         <artifactId>commons-httpclient</artifactId>
@@ -342,65 +310,6 @@
   </dependencyManagement>
 
   <profiles>
-    <profile>
-      <id>hadoop-3.2</id>
-      <properties>
-        <hadoop.version>3.2.1</hadoop.version>
-        <jaxb-api.version>2.2.11</jaxb-api.version>
-        <commons-compress.version>1.19</commons-compress.version>
-        <guice-servlet.version>4.0</guice-servlet.version>
-        <guice.version>4.0</guice.version>
-        <zookeeper.version>3.4.13</zookeeper.version>
-        <guava.version>30.0-jre</guava.version>
-        <jsr305.version>3.0.2</jsr305.version>
-        <profile-id>hadoop-3.2</profile-id>
-      </properties>
-    </profile>
-
-    <profile>
-      <id>hadoop-3.1</id>
-      <properties>
-        <hadoop.version>3.1.3</hadoop.version>
-        <jaxb-api.version>2.2.11</jaxb-api.version>
-        <commons-compress.version>1.19</commons-compress.version>
-        <guice-servlet.version>4.0</guice-servlet.version>
-        <guice.version>4.0</guice.version>
-        <zookeeper.version>3.4.13</zookeeper.version>
-        <profile-id>hadoop-3.1</profile-id>
-      </properties>
-    </profile>
-
-    <profile>
-      <id>hadoop-2.10</id>
-      <activation>
-        <activeByDefault>false</activeByDefault>
-      </activation>
-      <properties>
-        <hadoop.version>2.10.0</hadoop.version>
-        <jaxb-api.version>2.2.2</jaxb-api.version>
-        <commons-compress.version>1.19</commons-compress.version>
-        <guice-servlet.version>3.0</guice-servlet.version>
-        <guice.version>3.0</guice.version>
-        <zookeeper.version>3.4.9</zookeeper.version>
-        <profile-id>hadoop-2.10</profile-id>
-      </properties>
-    </profile>
-
-    <profile>
-      <id>hadoop-2.9</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <properties>
-        <hadoop.version>2.9.2</hadoop.version>
-        <jaxb-api.version>2.2.11</jaxb-api.version>
-        <commons-compress.version>1.4.1</commons-compress.version>
-        <guice-servlet.version>3.0</guice-servlet.version>
-        <guice.version>3.0</guice.version>
-        <zookeeper.version>3.4.6</zookeeper.version>
-        <profile-id>hadoop-2.9</profile-id>
-      </properties>
-    </profile>
 
     <profile>
      <id>clover</id>
diff --git a/submarine-client/pom.xml b/submarine-client/pom.xml
index 8f74fe6..e6b119d 100644
--- a/submarine-client/pom.xml
+++ b/submarine-client/pom.xml
@@ -64,67 +64,6 @@
       <scope>test</scope>
     </dependency>
 
-    <!-- Dependencies for Hadoop -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-lang3</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpcore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-lang</groupId>
-          <artifactId>commons-lang</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-collections</groupId>
-          <artifactId>commons-collections</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-core-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-mapper-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
   </dependencies>
 
 </project>
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java
deleted file mode 100644
index 177bf16..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-
-import java.io.IOException;
-
-public abstract class AbstractCli implements Tool {
-  protected ClientContext clientContext;
-
-  public AbstractCli(ClientContext cliContext) {
-    this.clientContext = cliContext;
-  }
-
-  @Override
-  public abstract int run(String[] args)
-      throws ParseException, IOException, YarnException, InterruptedException,
-      SubmarineException;
-
-  @Override
-  public void setConf(Configuration conf) {
-    clientContext.setYarnConfig(conf);
-  }
-
-  @Override
-  public Configuration getConf() {
-    return clientContext.getYarnConfig();
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
deleted file mode 100644
index 222de1f..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CliUtils {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(CliUtils.class);
-
-  /**
-   * Replace patterns inside cli
-   *
-   * @return launch command after pattern replace
-   */
-  public static String replacePatternsInLaunchCommand(String specifiedCli,
-                                                      RunJobParameters jobRunParameters,
-                                                      RemoteDirectoryManager directoryManager)
-      throws IOException {
-    String input = jobRunParameters.getInputPath();
-    String jobDir = jobRunParameters.getCheckpointPath();
-    String savedModelDir = jobRunParameters.getSavedModelPath();
-
-    Map<String, String> replacePattern = new HashMap<>();
-    if (jobDir != null) {
-      replacePattern.put("%" + CliConstants.CHECKPOINT_PATH + "%", jobDir);
-    }
-    if (input != null) {
-      replacePattern.put("%" + CliConstants.INPUT_PATH + "%", input);
-    }
-    if (savedModelDir != null) {
-      replacePattern.put("%" + CliConstants.SAVED_MODEL_PATH + "%",
-          savedModelDir);
-    }
-
-    String newCli = specifiedCli;
-    for (Map.Entry<String, String> replace : replacePattern.entrySet()) {
-      newCli = newCli.replace(replace.getKey(), replace.getValue());
-    }
-
-    return newCli;
-  }
-
-  // Is it for help?
-  public static boolean argsForHelp(String[] args) {
-    if (args == null || args.length == 0)
-      return true;
-
-    if (args.length == 1) {
-      return args[0].equals("-h") || args[0].equals("--help");
-    }
-
-    return false;
-  }
-
-  public static void doLoginIfSecure(String keytab, String principal) throws
-      IOException {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
-    }
-
-    if (StringUtils.isEmpty(keytab) || StringUtils.isEmpty(principal)) {
-      if (StringUtils.isNotEmpty(keytab)) {
-        SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
-            "parameter of " + CliConstants.PRINCIPAL + " is missing.");
-        LOG.error(e.getMessage(), e);
-        throw e;
-      }
-
-      if (StringUtils.isNotEmpty(principal)) {
-        SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
-            "parameter of " + CliConstants.KEYTAB + " is missing.");
-        LOG.error(e.getMessage(), e);
-        throw e;
-      }
-
-      UserGroupInformation user = UserGroupInformation.getCurrentUser();
-      if (user == null || user.getAuthenticationMethod() ==
-          UserGroupInformation.AuthenticationMethod.SIMPLE) {
-        SubmarineRuntimeException e = new SubmarineRuntimeException("Failed " +
-            "to authenticate in secure environment. Please run kinit " +
-            "command in advance or use " + "--" + CliConstants.KEYTAB + "/--" + CliConstants.PRINCIPAL +
-            " parameters");
-        LOG.error(e.getMessage(), e);
-        throw e;
-      }
-      LOG.info("Submarine job is submitted by user: " + user.getUserName());
-      return;
-    }
-
-    File keytabFile = new File(keytab);
-    if (!keytabFile.exists()) {
-      SubmarineRuntimeException e = new SubmarineRuntimeException("No " +
-          "keytab localized at  " + keytab);
-      LOG.error(e.getMessage(), e);
-      throw e;
-    }
-    UserGroupInformation.loginUserFromKeytab(principal, keytab);
-  }
-
-  /**
-   * As hadoop-2.7 doesn't have this method, we add this method in submarine.
-   * @param appIdStr
-   * @return
-   */
-  public static ApplicationId fromString(String appIdStr) {
-    String APPLICATION_ID_PREFIX = "application_";
-    if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) {
-      throw new IllegalArgumentException("Invalid ApplicationId prefix: "
-          + appIdStr + ". The valid ApplicationId should start with prefix "
-          + "application");
-    }
-    try {
-      int pos1 = APPLICATION_ID_PREFIX.length() - 1;
-      int pos2 = appIdStr.indexOf('_', pos1 + 1);
-      if (pos2 < 0) {
-        throw new IllegalArgumentException("Invalid ApplicationId: "
-            + appIdStr);
-      }
-      long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2));
-      int appId = Integer.parseInt(appIdStr.substring(pos2 + 1));
-      ApplicationId applicationId = ApplicationId.newInstance(rmId, appId);
-      return applicationId;
-    } catch (NumberFormatException n) {
-      throw new IllegalArgumentException("Invalid ApplicationId: "
-          + appIdStr, n);
-    }
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java
deleted file mode 100644
index 651a542..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.ShowJobParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class ShowJobCli extends AbstractCli {
-  private static final Logger LOG = LoggerFactory.getLogger(ShowJobCli.class);
-
-  private Options options;
-  private ParametersHolder parametersHolder;
-
-  public ShowJobCli(ClientContext cliContext) {
-    super(cliContext);
-    options = generateOptions();
-  }
-
-  public void printUsages() {
-    new HelpFormatter().printHelp("job show", options);
-  }
-
-  private Options generateOptions() {
-    Options options = new Options();
-    options.addOption(CliConstants.NAME, true, "Name of the job");
-    options.addOption("h", "help", false, "Print help");
-    return options;
-  }
-
-  private void parseCommandLineAndGetShowJobParameters(String[] args)
-      throws IOException, YarnException {
-    // Do parsing
-    GnuParser parser = new GnuParser();
-    CommandLine cli;
-    try {
-      cli = parser.parse(options, args);
-      parametersHolder = ParametersHolder
-          .createWithCmdLine(cli, Command.SHOW_JOB);
-      parametersHolder.updateParameters(clientContext);
-    } catch (ParseException e) {
-      printUsages();
-    }
-  }
-
-  private void printIfNotNull(String keyForPrint, String keyInStorage,
-      Map<String, String> jobInfo) {
-    if (jobInfo.containsKey(keyInStorage)) {
-      System.out.println("\t" + keyForPrint + ": " + jobInfo.get(keyInStorage));
-    }
-  }
-
-  private void printJobInfo(Map<String, String> jobInfo) {
-    System.out.println("Job Meta Info:");
-    printIfNotNull("Application Id", StorageKeyConstants.APPLICATION_ID,
-        jobInfo);
-    printIfNotNull("Input Path", StorageKeyConstants.INPUT_PATH, jobInfo);
-    printIfNotNull("Saved Model Path", StorageKeyConstants.SAVED_MODEL_PATH,
-        jobInfo);
-    printIfNotNull("Checkpoint Path", StorageKeyConstants.CHECKPOINT_PATH,
-        jobInfo);
-    printIfNotNull("Run Parameters", StorageKeyConstants.JOB_RUN_ARGS,
-        jobInfo);
-  }
-
-  @VisibleForTesting
-  protected void getAndPrintJobInfo() throws IOException {
-    SubmarineStorage storage =
-        clientContext.getRuntimeFactory().getSubmarineStorage();
-
-    Map<String, String> jobInfo = null;
-    try {
-      jobInfo = storage.getJobInfoByName(getParameters().getName());
-    } catch (IOException e) {
-      LOG.error("Failed to retrieve job info", e);
-      throw e;
-    }
-
-    printJobInfo(jobInfo);
-  }
-
-  @VisibleForTesting
-  public ShowJobParameters getParameters() {
-    return (ShowJobParameters) parametersHolder.getParameters();
-  }
-
-  @Override
-  public int run(String[] args)
-      throws ParseException, IOException, YarnException, InterruptedException,
-      SubmarineException {
-    if (CliUtils.argsForHelp(args)) {
-      printUsages();
-      return 0;
-    }
-    parseCommandLineAndGetShowJobParameters(args);
-    getAndPrintJobInfo();
-    return 0;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
deleted file mode 100644
index 0377450..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Localization parameter.
- * */
-public class Localization {
-
-  private String mountPermissionPattern = "(wr|rw)$";
-  /**
-   * Regex for directory/file path in container.
-   * YARN only support absolute path for mount, but we can
-   * support some relative path.
-   * For relative path, we only allow ".", "./","./name".
-   * relative path like "./a/b" is not allowed.
-   * "." and "./" means original dir/file name in container working directory
-   * "./name" means use same or new "name" in container working directory
-   * A absolute path means same path in container filesystem
-   */
-  private String localPathPattern = "((^\\.$)|(^\\./$)|(^\\./[^/]+)|(^/.*))";
-  private String remoteUri;
-  private String localPath;
-
-  // Read write by default
-  private String mountPermission = "rw";
-
-  private static final List<String> SUPPORTED_SCHEME = Arrays.asList(
-      "hdfs", "oss", "s3a", "s3n", "wasb",
-      "wasbs", "abfs", "abfss", "adl", "har",
-      "ftp", "http", "https", "viewfs", "swebhdfs",
-      "webhdfs", "swift");
-
-  public void parse(String arg) throws ParseException {
-    String[] tokens = arg.split(":");
-    int minimum = "a:b".split(":").length;
-    int minimumWithPermission = "a:b:rw".split(":").length;
-    int minimumParts = minimum;
-    int miniPartsWithRemoteScheme = "scheme://a:b".split(":").length;
-    int maximumParts = "scheme://a:b:rw".split(":").length;
-    // If remote uri starts with a remote scheme
-    if (isSupportedScheme(tokens[0])) {
-      minimumParts = miniPartsWithRemoteScheme;
-    }
-    if (tokens.length < minimumParts
-        || tokens.length > maximumParts) {
-      throw new ParseException("Invalid parameter,"
-          + "should be \"remoteUri:localPath[:rw|:wr]\" "
-          + "format for --localizations");
-    }
-
-    /**
-     * RemoteUri starts with remote scheme.
-     * Merge part 0 and 1 to build a hdfs path in token[0].
-     * toke[1] will be localPath to ease following logic
-     * */
-    if (minimumParts == miniPartsWithRemoteScheme) {
-      tokens[0] = tokens[0] + ":" + tokens[1];
-      tokens[1] = tokens[2];
-      if (tokens.length == maximumParts) {
-        // Has permission part
-        mountPermission = tokens[maximumParts - 1];
-      }
-    }
-    // RemoteUri starts with linux file path
-    if (minimumParts == minimum
-        && tokens.length == minimumWithPermission) {
-      // Has permission part
-      mountPermission = tokens[minimumWithPermission - 1];
-    }
-    remoteUri = tokens[0];
-    localPath = tokens[1];
-    if (!localPath.matches(localPathPattern)) {
-      throw new ParseException("Invalid local file path:"
-          + localPath
-          + ", it only support \".\", \"./\", \"./name\" and "
-          + "absolute path.");
-    }
-    if (!mountPermission.matches(mountPermissionPattern)) {
-      throw new ParseException("Invalid mount permission (ro is not "
-          + "supported yet), " + mountPermission);
-    }
-  }
-
-  public String getRemoteUri() {
-    return remoteUri;
-  }
-
-  public Localization setRemoteUri(String rUti) {
-    this.remoteUri = rUti;
-    return this;
-  }
-
-  public String getLocalPath() {
-    return localPath;
-  }
-
-  public Localization setLocalPath(String lPath) {
-    this.localPath = lPath;
-    return this;
-  }
-
-  public String getMountPermission() {
-    return mountPermission;
-  }
-
-  public Localization setMountPermission(String mPermission) {
-    this.mountPermission = mPermission;
-    return this;
-  }
-
-  private boolean isSupportedScheme(String scheme) {
-    return SUPPORTED_SCHEME.contains(scheme);
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
deleted file mode 100644
index 556dfe6..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.Command;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.Configs;
-import org.apache.submarine.client.cli.param.yaml.Role;
-import org.apache.submarine.client.cli.param.yaml.Roles;
-import org.apache.submarine.client.cli.param.yaml.Scheduling;
-import org.apache.submarine.client.cli.param.yaml.Security;
-import org.apache.submarine.client.cli.param.yaml.TensorBoard;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCli.YAML_PARSE_FAILED;
-
-/**
- * This class acts as a wrapper of {@code CommandLine} values along with
- * YAML configuration values.
- * YAML configuration is only stored if the -f &lt;filename&gt;
- * option is specified along the CLI arguments.
- * Using this wrapper class makes easy to deal with
- * any form of configuration source potentially added into Submarine,
- * in the future.
- * If both YAML and CLI value is found for a config, this is an error case.
- */
-public final class ParametersHolder implements Parameter {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ParametersHolder.class);
-
-  public static final String SUPPORTED_FRAMEWORKS_MESSAGE =
-      "TensorFlow, PyTorch, MXNet are the only supported frameworks for now!";
-  public static final String SUPPORTED_COMMANDS_MESSAGE =
-      "'Show job' and 'run job' are the only supported commands for now!";
-
-
-
-  private CommandLine parsedCommandLine;
-  private Map<String, String> yamlStringConfigs;
-  private Map<String, List<String>> yamlListConfigs;
-  private ConfigType configType;
-  private Command command;
-  private final Set onlyDefinedWithCliArgs = ImmutableSet.of(
-      CliConstants.VERBOSE);
-  private Framework framework;
-  private BaseParameters parameters;
-
-  private ParametersHolder(CommandLine parsedCommandLine,
-      YamlConfigFile yamlConfig, ConfigType configType, Command command)
-      throws ParseException, YarnException {
-    this.parsedCommandLine = parsedCommandLine;
-    this.yamlStringConfigs = initStringConfigValues(yamlConfig);
-    this.yamlListConfigs = initListConfigValues(yamlConfig);
-    this.configType = configType;
-    this.command = command;
-    this.framework = determineFrameworkType();
-    this.ensureOnlyValidSectionsAreDefined(yamlConfig);
-    this.parameters = createParameters();
-  }
-
-  private ParametersHolder(){
-    super();
-  }
-
-  private BaseParameters createParameters() {
-    if (command == Command.RUN_JOB) {
-      if (framework == Framework.TENSORFLOW) {
-        return new TensorFlowRunJobParameters();
-      } else if (framework == Framework.PYTORCH) {
-        return new PyTorchRunJobParameters();
-      } else if (framework == Framework.MXNET) {
-        return new MXNetRunJobParameters();
-      } else {
-        throw new UnsupportedOperationException(SUPPORTED_FRAMEWORKS_MESSAGE);
-      }
-    } else if (command == Command.SHOW_JOB) {
-      return new ShowJobParameters();
-    } else {
-      throw new UnsupportedOperationException(SUPPORTED_COMMANDS_MESSAGE);
-    }
-  }
-
-  private void ensureOnlyValidSectionsAreDefined(YamlConfigFile yamlConfig) {
-    if (isCommandRunJob() && isFrameworkPyTorch() &&
-        isPsSectionDefined(yamlConfig)) {
-      throw new YamlParseException(
-          "PS section should not be defined when PyTorch " +
-              "is the selected framework!");
-    }
-
-    if (isCommandRunJob() && (isFrameworkPyTorch() || isFrameworkMXNet())
-        && isTensorboardSectionDefined(yamlConfig)) {
-      throw new YamlParseException(
-          "TensorBoard section should not be defined when TensorFlow " +
-              "is not the selected framework!");
-    }
-
-    if (isCommandRunJob() && !isFrameworkMXNet() &&
-        isSchedulerSectionDefined(yamlConfig)) {
-      throw new YamlParseException(
-          "Scheduler section should not be defined when MXNet " +
-              "is not the selected framework!");
-    }
-  }
-
-  private boolean isCommandRunJob() {
-    return command == Command.RUN_JOB;
-  }
-
-  private boolean isFrameworkPyTorch() {
-    return framework == Framework.PYTORCH;
-  }
-
-  private boolean isFrameworkMXNet() {
-    return framework == Framework.MXNET;
-  }
-
-  private boolean isPsSectionDefined(YamlConfigFile yamlConfig) {
-    return yamlConfig != null &&
-        yamlConfig.getRoles() != null &&
-        yamlConfig.getRoles().getPs() != null;
-  }
-
-  private boolean isTensorboardSectionDefined(YamlConfigFile yamlConfig) {
-    return yamlConfig != null &&
-        yamlConfig.getTensorBoard() != null;
-  }
-
-  private boolean isSchedulerSectionDefined(YamlConfigFile yamlConfig) {
-    return yamlConfig != null &&
-        yamlConfig.getRoles() != null &&
-        yamlConfig.getRoles().getScheduler() != null;
-  }
-
-  private Framework determineFrameworkType()
-      throws ParseException, YarnException {
-    if (!isCommandRunJob()) {
-      return null;
-    }
-    String frameworkStr = getOptionValue(CliConstants.FRAMEWORK);
-    if (frameworkStr == null) {
-      LOG.info("Framework is not defined in config, falling back to " +
-          "TensorFlow as a default.");
-      return Framework.TENSORFLOW;
-    }
-    Framework framework = Framework.parseByValue(frameworkStr);
-    if (framework == null) {
-      if (getConfigType() == ConfigType.CLI) {
-        throw new ParseException("Failed to parse Framework type! "
-            + "Valid values are: " + Framework.getValues());
-      } else {
-        throw new YamlParseException(YAML_PARSE_FAILED +
-            ", framework should is defined, but it has an invalid value! " +
-            "Valid values are: " + Framework.getValues());
-      }
-    }
-    return framework;
-  }
-
-  /**
-   * Maps every value coming from the passed yamlConfig to {@code CliConstants}.
-   * @param yamlConfig Parsed YAML config
-   * @return A map of config values, keys are {@code CliConstants}
-   * and values are Strings.
-   */
-  private Map<String, String> initStringConfigValues(
-      YamlConfigFile yamlConfig) {
-    if (yamlConfig == null) {
-      return Collections.emptyMap();
-    }
-    Map<String, String> yamlConfigValues = Maps.newHashMap();
-    Roles roles = yamlConfig.getRoles();
-
-    initGenericConfigs(yamlConfig, yamlConfigValues);
-    initPs(yamlConfigValues, roles.getPs());
-    initWorker(yamlConfigValues, roles.getWorker());
-    initScheduler(yamlConfigValues, roles.getScheduler());
-    initScheduling(yamlConfigValues, yamlConfig.getScheduling());
-    initSecurity(yamlConfigValues, yamlConfig.getSecurity());
-    initTensorBoard(yamlConfigValues, yamlConfig.getTensorBoard());
-
-    return yamlConfigValues;
-  }
-
-  private Map<String, List<String>> initListConfigValues(
-      YamlConfigFile yamlConfig) {
-    if (yamlConfig == null) {
-      return Collections.emptyMap();
-    }
-
-    Map<String, List<String>> yamlConfigValues = Maps.newHashMap();
-    Configs configs = yamlConfig.getConfigs();
-    yamlConfigValues.put(CliConstants.LOCALIZATION, configs.getLocalizations());
-    yamlConfigValues.put(CliConstants.ENV,
-        convertToEnvsList(configs.getEnvs()));
-    yamlConfigValues.put(CliConstants.QUICKLINK, configs.getQuicklinks());
-
-    return yamlConfigValues;
-  }
-
-  private void initGenericConfigs(YamlConfigFile yamlConfig,
-      Map<String, String> yamlConfigs) {
-    yamlConfigs.put(CliConstants.NAME, yamlConfig.getSpec().getName());
-    yamlConfigs.put(CliConstants.FRAMEWORK,
-        yamlConfig.getSpec().getFramework());
-
-    Configs configs = yamlConfig.getConfigs();
-    yamlConfigs.put(CliConstants.INPUT_PATH, configs.getInputPath());
-    yamlConfigs.put(CliConstants.CHECKPOINT_PATH, configs.getCheckpointPath());
-    yamlConfigs.put(CliConstants.SAVED_MODEL_PATH, configs.getSavedModelPath());
-    yamlConfigs.put(CliConstants.DOCKER_IMAGE, configs.getDockerImage());
-    yamlConfigs.put(CliConstants.WAIT_JOB_FINISH, configs.getWaitJobFinish());
-  }
-
-  private void initPs(Map<String, String> yamlConfigs, Role ps) {
-    if (ps == null) {
-      return;
-    }
-    yamlConfigs.put(CliConstants.N_PS, String.valueOf(ps.getReplicas()));
-    yamlConfigs.put(CliConstants.PS_RES, ps.getResources());
-    yamlConfigs.put(CliConstants.PS_DOCKER_IMAGE, ps.getDockerImage());
-    yamlConfigs.put(CliConstants.PS_LAUNCH_CMD, ps.getLaunchCmd());
-  }
-
-  private void initWorker(Map<String, String> yamlConfigs, Role worker) {
-    if (worker == null) {
-      return;
-    }
-    yamlConfigs.put(CliConstants.N_WORKERS,
-        String.valueOf(worker.getReplicas()));
-    yamlConfigs.put(CliConstants.WORKER_RES, worker.getResources());
-    yamlConfigs.put(CliConstants.WORKER_DOCKER_IMAGE, worker.getDockerImage());
-    yamlConfigs.put(CliConstants.WORKER_LAUNCH_CMD, worker.getLaunchCmd());
-  }
-
-  private void initScheduler(Map<String, String> yamlConfigs, Role scheduler) {
-    if (scheduler == null) {
-      return;
-    }
-    yamlConfigs.put(CliConstants.N_SCHEDULERS,
-            String.valueOf(scheduler.getReplicas()));
-    yamlConfigs.put(CliConstants.SCHEDULER_RES, scheduler.getResources());
-    yamlConfigs.put(CliConstants.SCHEDULER_DOCKER_IMAGE, scheduler.getDockerImage());
-    yamlConfigs.put(CliConstants.SCHEDULER_LAUNCH_CMD, scheduler.getLaunchCmd());
-  }
-
-  private void initScheduling(Map<String, String> yamlConfigValues,
-      Scheduling scheduling) {
-    if (scheduling == null) {
-      return;
-    }
-    yamlConfigValues.put(CliConstants.QUEUE, scheduling.getQueue());
-  }
-
-  private void initSecurity(Map<String, String> yamlConfigValues,
-      Security security) {
-    if (security == null) {
-      return;
-    }
-    yamlConfigValues.put(CliConstants.KEYTAB, security.getKeytab());
-    yamlConfigValues.put(CliConstants.PRINCIPAL, security.getPrincipal());
-    yamlConfigValues.put(CliConstants.DISTRIBUTE_KEYTAB,
-        String.valueOf(security.isDistributeKeytab()));
-  }
-
-  private void initTensorBoard(Map<String, String> yamlConfigValues,
-      TensorBoard tensorBoard) {
-    if (tensorBoard == null) {
-      return;
-    }
-    yamlConfigValues.put(CliConstants.TENSORBOARD, Boolean.TRUE.toString());
-    yamlConfigValues.put(CliConstants.TENSORBOARD_DOCKER_IMAGE,
-        tensorBoard.getDockerImage());
-    yamlConfigValues.put(CliConstants.TENSORBOARD_RESOURCES,
-        tensorBoard.getResources());
-  }
-
-  private List<String> convertToEnvsList(Map<String, String> envs) {
-    if (envs == null) {
-      return Collections.emptyList();
-    }
-    return envs.entrySet().stream()
-        .map(e -> String.format("%s=%s", e.getKey(), e.getValue()))
-        .collect(Collectors.toList());
-  }
-
-  public static ParametersHolder create() {
-    return new ParametersHolder();
-  }
-
-  public static ParametersHolder createWithCmdLine(CommandLine cli,
-      Command command) throws ParseException, YarnException {
-    return new ParametersHolder(cli, null, ConfigType.CLI, command);
-  }
-
-  public static ParametersHolder createWithCmdLineAndYaml(CommandLine cli,
-      YamlConfigFile yamlConfig, Command command) throws ParseException,
-      YarnException {
-    return new ParametersHolder(cli, yamlConfig, ConfigType.YAML, command);
-  }
-
-  /**
-   * Gets the option value, either from the CLI arguments or YAML config,
-   * if present.
-   * @param option Name of the config.
-   * @return The value of the config
-   */
-  public String getOptionValue(String option) throws YarnException {
-    ensureConfigIsDefinedOnce(option, true);
-    if (onlyDefinedWithCliArgs.contains(option) ||
-        parsedCommandLine.hasOption(option)) {
-      return getValueFromCLI(option);
-    }
-    return getValueFromYaml(option);
-  }
-
-  /**
-   * Gets the option values, either from the CLI arguments or YAML config,
-   * if present.
-   * @param option Name of the config.
-   * @return The values of the config
-   */
-  public List<String> getOptionValues(String option) throws YarnException {
-    ensureConfigIsDefinedOnce(option, false);
-    if (onlyDefinedWithCliArgs.contains(option) ||
-        parsedCommandLine.hasOption(option)) {
-      return getValuesFromCLI(option);
-    }
-    return getValuesFromYaml(option);
-  }
-
-  private void ensureConfigIsDefinedOnce(String option, boolean stringValue)
-      throws YarnException {
-    boolean definedWithYaml;
-    if (stringValue) {
-      definedWithYaml = yamlStringConfigs.containsKey(option);
-    } else {
-      definedWithYaml = yamlListConfigs.containsKey(option);
-    }
-
-    if (parsedCommandLine.hasOption(option) && definedWithYaml) {
-      throw new YarnException("Config '%s' is defined both with YAML config" +
-          " and with CLI argument, please only use either way!");
-    }
-  }
-
-  private String getValueFromCLI(String option) {
-    String value = parsedCommandLine.getOptionValue(option);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found config value {} for key {} " +
-          "from CLI configuration.", value, option);
-    }
-    return value;
-  }
-
-  private List<String> getValuesFromCLI(String option) {
-    String[] optionValues = parsedCommandLine.getOptionValues(option);
-    if (optionValues != null) {
-      List<String> values = Arrays.asList(optionValues);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Found config values {} for key {} " +
-            "from CLI configuration.", values, option);
-      }
-      return values;
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No config values found for key {} " +
-            "from CLI configuration.", option);
-      }
-      return Lists.newArrayList();
-    }
-  }
-
-  private String getValueFromYaml(String option) {
-    String value = yamlStringConfigs.get(option);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found config value {} for key {} " +
-          "from YAML configuration.", value, option);
-    }
-    return value;
-  }
-
-  private List<String> getValuesFromYaml(String option) {
-    List<String> values = yamlListConfigs.get(option);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found config values {} for key {} " +
-          "from YAML configuration.", values, option);
-    }
-    return values;
-  }
-
-  /**
-   * Returns the boolean value of option.
-   * First, we check if the CLI value is defined for the option.
-   * If not, then we check the YAML value.
-   * @param option name of the option
-   * @return true, if the option is found in the CLI args or in the YAML config,
-   * false otherwise.
-   */
-  public boolean hasOption(String option) {
-    if (onlyDefinedWithCliArgs.contains(option)) {
-      boolean value = parsedCommandLine.hasOption(option);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Found boolean config with value {} for key {} " +
-            "from CLI configuration.", value, option);
-      }
-      return value;
-    }
-    if (parsedCommandLine.hasOption(option)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Found boolean config value for key {} " +
-            "from CLI configuration.", option);
-      }
-      return true;
-    }
-    return getBooleanValueFromYaml(option);
-  }
-
-  private boolean getBooleanValueFromYaml(String option) {
-    String stringValue = yamlStringConfigs.get(option);
-    boolean result = stringValue != null
-        && Boolean.valueOf(stringValue).equals(Boolean.TRUE);
-    LOG.debug("Found config value {} for key {} " +
-        "from YAML configuration.", result, option);
-    return result;
-  }
-
-  public ConfigType getConfigType() {
-    return configType;
-  }
-
-  public Framework getFramework() {
-    return framework;
-  }
-
-  @Override
-  public Parameter setFramework(Framework framework) {
-    this.framework = framework;
-    return this;
-  }
-
-  public void updateParameters(ClientContext clientContext)
-      throws ParseException, YarnException, IOException {
-    parameters.updateParameters(this, clientContext);
-  }
-
-  public BaseParameters getParameters() {
-    return parameters;
-  }
-
-  public Parameter setParameters(BaseParameters parameters) {
-    this.parameters = parameters;
-    return this;
-  }
-
-  public CommandLine getParsedCommandLine() {
-    return parsedCommandLine;
-  }
-
-  public Parameter setParsedCommandLine(CommandLine parsedCommandLine) {
-    this.parsedCommandLine = parsedCommandLine;
-    return this;
-  }
-
-  public Map<String, String> getYamlStringConfigs() {
-    return yamlStringConfigs;
-  }
-
-  public Parameter setYamlStringConfigs(Map<String, String> yamlStringConfigs) {
-    this.yamlStringConfigs = yamlStringConfigs;
-    return this;
-  }
-
-  public Map<String, List<String>> getYamlListConfigs() {
-    return yamlListConfigs;
-  }
-
-  public Parameter setYamlListConfigs(
-      Map<String, List<String>> yamlListConfigs) {
-    this.yamlListConfigs = yamlListConfigs;
-    return this;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
deleted file mode 100644
index 4fe035c..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-
-/**
- * A class represents quick links to a web page.
- */
-public class Quicklink {
-  private String label;
-  private String componentInstanceName;
-  private String protocol;
-  private int port;
-
-  public void parse(String quicklinkStr) throws ParseException {
-    if (!quicklinkStr.contains("=")) {
-      throw new ParseException("Should be <label>=<link> format for quicklink");
-    }
-
-    int index = quicklinkStr.indexOf("=");
-    label = quicklinkStr.substring(0, index);
-    quicklinkStr = quicklinkStr.substring(index + 1);
-
-    if (quicklinkStr.startsWith("http://")) {
-      protocol = "http://";
-    } else if (quicklinkStr.startsWith("https://")) {
-      protocol = "https://";
-    } else {
-      throw new ParseException("Quicklink should start with http or https");
-    }
-
-    quicklinkStr = quicklinkStr.substring(protocol.length());
-    index = quicklinkStr.indexOf(":");
-
-    if (index == -1) {
-      throw new ParseException("Quicklink should be componet-id:port form");
-    }
-
-    componentInstanceName = quicklinkStr.substring(0, index);
-    port = Integer.parseInt(quicklinkStr.substring(index + 1));
-  }
-
-  public String getLabel() {
-    return label;
-  }
-
-  public Quicklink setLabel(String label) {
-    this.label = label;
-    return this;
-  }
-
-  public String getComponentInstanceName() {
-    return componentInstanceName;
-  }
-
-  public Quicklink setComponentInstanceName(String componentInstanceName) {
-    this.componentInstanceName = componentInstanceName;
-    return this;
-  }
-
-  public String getProtocol() {
-    return protocol;
-  }
-
-  public Quicklink setProtocol(String protocol) {
-    this.protocol = protocol;
-    return this;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public Quicklink setPort(int port) {
-    this.port = port;
-    return this;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java
deleted file mode 100644
index 2bfbb9e..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Parameters required to run anything on cluster. Such as run job / serve model
- */
-public abstract class RunParameters extends BaseParameters {
-  private String savedModelPath;
-  private String dockerImageName;
-  private List<String> envars = new ArrayList<>();
-  private String queue;
-
-  @Override
-  public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
-		  throws ParseException,
-      IOException, YarnException {
-    String savedModelPath = parametersHolder.getOptionValue(
-        CliConstants.SAVED_MODEL_PATH);
-    this.setSavedModelPath(savedModelPath);
-
-    List<String> envVars = getEnvVars((ParametersHolder)parametersHolder);
-    this.setEnvars(envVars);
-
-    String queue = parametersHolder.getOptionValue(
-        CliConstants.QUEUE);
-    this.setQueue(queue);
-
-    String dockerImage = parametersHolder.getOptionValue(
-        CliConstants.DOCKER_IMAGE);
-    this.setDockerImageName(dockerImage);
-
-    super.updateParameters(parametersHolder, clientContext);
-  }
-
-  private List<String> getEnvVars(ParametersHolder parametersHolder)
-      throws YarnException {
-    List<String> result = new ArrayList<>();
-    List<String> envVarsArray = parametersHolder.getOptionValues(
-        CliConstants.ENV);
-    if (envVarsArray != null) {
-      result.addAll(envVarsArray);
-    }
-    return result;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public RunParameters setQueue(String queue) {
-    this.queue = queue;
-    return this;
-  }
-
-  public String getDockerImageName() {
-    return dockerImageName;
-  }
-
-  public RunParameters setDockerImageName(String dockerImageName) {
-    this.dockerImageName = dockerImageName;
-    return this;
-  }
-
-
-  public List<String> getEnvars() {
-    return envars;
-  }
-
-  public RunParameters setEnvars(List<String> envars) {
-    this.envars = envars;
-    return this;
-  }
-
-  public String getSavedModelPath() {
-    return savedModelPath;
-  }
-
-  public RunParameters setSavedModelPath(String savedModelPath) {
-    this.savedModelPath = savedModelPath;
-    return this;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java
deleted file mode 100644
index e71b3d7..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-
-public class ShowJobParameters extends BaseParameters {
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java
deleted file mode 100644
index 4c4671b..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.api.MXNetRole;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Parameters for MXNet job.
- */
-public class MXNetRunJobParameters extends RunJobParameters {
-    private RoleParameters psParameters =
-            RoleParameters.createEmpty(MXNetRole.PS);
-
-    private RoleParameters schedulerParameters =
-            RoleParameters.createEmpty(MXNetRole.SCHEDULER);
-
-    private static final String CANNOT_BE_DEFINED_FOR_MXNET =
-            "cannot be defined for MXNet jobs!";
-
-    @Override
-    public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
-            throws ParseException, IOException, YarnException {
-
-        checkArguments(parametersHolder);
-        super.updateParameters(parametersHolder, clientContext);
-
-        String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
-        this.workerParameters = generateWorkerParameters(clientContext, parametersHolder, input);
-        this.psParameters = getPSParameters(parametersHolder);
-        this.schedulerParameters = getSchedulerParameters(parametersHolder);
-        this.distributed = determineIfDistributed(workerParameters.getReplicas(),
-                psParameters.getReplicas(), schedulerParameters.getReplicas());
-
-        executePostOperations(clientContext);
-    }
-
-    @Override
-    void executePostOperations(ClientContext clientContext) throws IOException {
-        // Set default job dir / saved model dir, etc.
-        setDefaultDirs(clientContext);
-        replacePatternsInParameters(clientContext);
-    }
-
-    @Override
-    public List<String> getLaunchCommands() {
-        return Lists.newArrayList(getWorkerLaunchCmd(), getPSLaunchCmd(), getSchedulerLaunchCmd());
-    }
-
-    private void replacePatternsInParameters(ClientContext clientContext)
-            throws IOException {
-        if (StringUtils.isNotEmpty(getPSLaunchCmd())) {
-            String afterReplace =
-                    CliUtils.replacePatternsInLaunchCommand(getPSLaunchCmd(), this,
-                            clientContext.getRemoteDirectoryManager());
-            setPSLaunchCmd(afterReplace);
-        }
-        if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
-            String afterReplace =
-                    CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
-                            clientContext.getRemoteDirectoryManager());
-            setWorkerLaunchCmd(afterReplace);
-        }
-        if (StringUtils.isNotEmpty(getSchedulerLaunchCmd())) {
-            String afterReplace =
-                    CliUtils.replacePatternsInLaunchCommand(getSchedulerLaunchCmd(), this,
-                            clientContext.getRemoteDirectoryManager());
-            setSchedulerLaunchCmd(afterReplace);
-        }
-    }
-
-    private void checkArguments(Parameter parametersHolder)
-        throws YarnException, ParseException {
-        if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
-            throw new ParseException(getParamCannotBeDefinedErrorMessage(
-                    CliConstants.TENSORBOARD));
-        } else if (parametersHolder
-                .getOptionValue(CliConstants.TENSORBOARD_RESOURCES) != null) {
-            throw new ParseException(getParamCannotBeDefinedErrorMessage(
-                    CliConstants.TENSORBOARD_RESOURCES));
-        } else if (parametersHolder
-                .getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE) != null) {
-            throw new ParseException(getParamCannotBeDefinedErrorMessage(
-                    CliConstants.TENSORBOARD_DOCKER_IMAGE));
-        }
-    }
-
-    private boolean determineIfDistributed(int nWorkers, int nPS, int nSchedulers) {
-        return nWorkers >= 2 && nPS > 0 && nSchedulers == 1;
-    }
-
-    private String getParamCannotBeDefinedErrorMessage(String cliName) {
-        return String.format(
-                "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_MXNET, cliName);
-    }
-
-    private RoleParameters getPSParameters(Parameter parametersHolder)
-            throws YarnException, ParseException {
-        int nPS = getNumberOfPS(parametersHolder);
-        Resource psResource =
-                determinePSResource(parametersHolder, nPS);
-        String psDockerImage =
-                parametersHolder.getOptionValue(CliConstants.PS_DOCKER_IMAGE);
-        String psLaunchCommand =
-                parametersHolder.getOptionValue(CliConstants.PS_LAUNCH_CMD);
-        return new RoleParameters(MXNetRole.PS, nPS, psLaunchCommand,
-                psDockerImage, psResource);
-    }
-
-    private Resource determinePSResource(Parameter parametersHolder, int nPS)
-            throws ParseException, YarnException {
-        if (nPS > 0) {
-            String psResourceStr =
-                    parametersHolder.getOptionValue(CliConstants.PS_RES);
-            if (psResourceStr == null) {
-                throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
-            }
-            return ResourceUtils.createResourceFromString(psResourceStr);
-        }
-        return null;
-    }
-
-    public String getPSLaunchCmd() {
-        return psParameters.getLaunchCommand();
-    }
-
-    public void setPSLaunchCmd(String launchCmd) {
-        psParameters.setLaunchCommand(launchCmd);
-    }
-
-    private int getNumberOfPS(Parameter parametersHolder) throws YarnException {
-        int nPS = 0;
-        if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
-            nPS = Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_PS));
-        }
-        return nPS;
-    }
-
-    private RoleParameters getSchedulerParameters(Parameter parametersHolder)
-            throws YarnException, ParseException {
-        int nSchedulers = getNumberOfScheduler(parametersHolder);
-        Resource schedulerResource =
-                determineSchedulerResource(parametersHolder, nSchedulers);
-        String schedulerDockerImage =
-                parametersHolder.getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE);
-        String schedulerLaunchCommand =
-                parametersHolder.getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD);
-        return new RoleParameters(MXNetRole.SCHEDULER, nSchedulers, schedulerLaunchCommand,
-                schedulerDockerImage, schedulerResource);
-    }
-
-    private Resource determineSchedulerResource(Parameter parametersHolder, int nSchedulers)
-            throws ParseException, YarnException {
-        if (nSchedulers > 0) {
-            String schedulerResourceStr = parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES);
-            if (schedulerResourceStr == null) {
-                throw new ParseException("--" + CliConstants.SCHEDULER_RES + " is absent.");
-            }
-            return ResourceUtils.createResourceFromString(schedulerResourceStr);
-        }
-        return null;
-    }
-
-    private int getNumberOfScheduler(Parameter parametersHolder) throws ParseException, YarnException {
-        int nSchedulers = 0;
-        if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
-            nSchedulers = Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS));
-            if (nSchedulers > 1 || nSchedulers < 0) {
-                throw new ParseException("--" + CliConstants.N_SCHEDULERS + " should be 1 or 0");
-            }
-        }
-        return nSchedulers;
-    }
-
-    public String getSchedulerLaunchCmd() {
-        return schedulerParameters.getLaunchCommand();
-    }
-
-    public void setSchedulerLaunchCmd(String launchCmd) {
-        schedulerParameters.setLaunchCommand(launchCmd);
-    }
-
-    public int getNumPS() {
-        return psParameters.getReplicas();
-    }
-
-    public Resource getPsResource() {
-        return psParameters.getResource();
-    }
-
-    public String getPsDockerImage() {
-        return psParameters.getDockerImage();
-    }
-
-    public int getNumSchedulers() {
-        return schedulerParameters.getReplicas();
-    }
-
-    public Resource getSchedulerResource() {
-        return schedulerParameters.getResource();
-    }
-
-    public String getSchedulerDockerImage() {
-        return schedulerParameters.getDockerImage();
-    }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
deleted file mode 100644
index ad0b64f..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import com.google.common.collect.Lists;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-/**
- * Parameters for PyTorch job.
- */
-public class PyTorchRunJobParameters extends RunJobParameters {
-
-  private static final String CANNOT_BE_DEFINED_FOR_PYTORCH =
-      "cannot be defined for PyTorch jobs!";
-
-  @Override
-  public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
-      throws ParseException, IOException, YarnException {
-    checkArguments(parametersHolder);
-
-    super.updateParameters(parametersHolder, clientContext);
-
-    String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
-    this.workerParameters =
-        generateWorkerParameters(clientContext, parametersHolder, input);
-    this.distributed = determineIfDistributed(workerParameters.getReplicas());
-    executePostOperations(clientContext);
-  }
-
-  private void checkArguments(Parameter parametersHolder)
-      throws YarnException, ParseException {
-    if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.N_PS));
-    } else if (parametersHolder.getOptionValue(CliConstants.PS_RES) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.PS_RES));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.PS_DOCKER_IMAGE) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.PS_DOCKER_IMAGE));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.PS_LAUNCH_CMD) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.PS_LAUNCH_CMD));
-    } else if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.TENSORBOARD));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.TENSORBOARD_RESOURCES) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.TENSORBOARD_RESOURCES));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.TENSORBOARD_DOCKER_IMAGE));
-    } else if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.N_SCHEDULERS));
-    } else if (parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_RES));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_DOCKER_IMAGE));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_LAUNCH_CMD));
-    }
-  }
-
-  private String getParamCannotBeDefinedErrorMessage(String cliName) {
-    return String.format(
-        "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_PYTORCH, cliName);
-  }
-
-  @Override
-  void executePostOperations(ClientContext clientContext) throws IOException {
-    // Set default job dir / saved model dir, etc.
-    setDefaultDirs(clientContext);
-    replacePatternsInParameters(clientContext);
-  }
-
-  private void replacePatternsInParameters(ClientContext clientContext)
-      throws IOException {
-    if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
-      String afterReplace =
-          CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
-              clientContext.getRemoteDirectoryManager());
-      setWorkerLaunchCmd(afterReplace);
-    }
-  }
-
-  @Override
-  public List<String> getLaunchCommands() {
-    return Lists.newArrayList(getWorkerLaunchCmd());
-  }
-
-  /**
-   * We only support non-distributed PyTorch integration for now.
-   * @param nWorkers
-   * @return
-   */
-  private boolean determineIfDistributed(int nWorkers) {
-    return false;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
deleted file mode 100644
index 64b01b1..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.param.Localization;
-import org.apache.submarine.client.cli.param.Quicklink;
-import org.apache.submarine.client.cli.param.RunParameters;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.api.TensorFlowRole;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.yaml.snakeyaml.introspector.Property;
-import org.yaml.snakeyaml.introspector.PropertyUtils;
-
-import java.beans.IntrospectionException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Parameters used to run a job
- */
-public abstract class RunJobParameters extends RunParameters {
-  private String input;
-  private String checkpointPath;
-
-  private List<Quicklink> quicklinks = new ArrayList<>();
-  private List<Localization> localizations = new ArrayList<>();
-
-  private boolean waitJobFinish = false;
-  protected boolean distributed = false;
-
-  private boolean securityDisabled = false;
-  private String keytab;
-  private String principal;
-  private boolean distributeKeytab = false;
-  private List<String> confPairs = new ArrayList<>();
-
-  RoleParameters workerParameters =
-      RoleParameters.createEmpty(TensorFlowRole.WORKER);
-
-  @Override
-  public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
-      throws ParseException, IOException, YarnException {
-
-    String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
-    String jobDir = parametersHolder.getOptionValue(
-        CliConstants.CHECKPOINT_PATH);
-
-    if (parametersHolder.hasOption(CliConstants.INSECURE_CLUSTER)) {
-      setSecurityDisabled(true);
-    }
-
-    String kerberosKeytab = parametersHolder.getOptionValue(
-        CliConstants.KEYTAB);
-    String kerberosPrincipal = parametersHolder.getOptionValue(
-        CliConstants.PRINCIPAL);
-    CliUtils.doLoginIfSecure(kerberosKeytab, kerberosPrincipal);
-
-    if (parametersHolder.hasOption(CliConstants.WAIT_JOB_FINISH)) {
-      this.waitJobFinish = true;
-    }
-
-    // Quicklinks
-    List<String> quicklinkStrs = parametersHolder.getOptionValues(
-        CliConstants.QUICKLINK);
-    if (quicklinkStrs != null) {
-      for (String ql : quicklinkStrs) {
-        Quicklink quicklink = new Quicklink();
-        quicklink.parse(ql);
-        quicklinks.add(quicklink);
-      }
-    }
-
-    // Localizations
-    List<String> localizationsStr = parametersHolder.getOptionValues(
-        CliConstants.LOCALIZATION);
-    if (null != localizationsStr) {
-      for (String loc : localizationsStr) {
-        Localization localization = new Localization();
-        localization.parse(loc);
-        localizations.add(localization);
-      }
-    }
-    boolean distributeKerberosKeytab = parametersHolder.hasOption(CliConstants
-        .DISTRIBUTE_KEYTAB);
-
-    List<String> configPairs = parametersHolder
-        .getOptionValues(CliConstants.ARG_CONF);
-
-    this.setInputPath(input).setCheckpointPath(jobDir)
-        .setKeytab(kerberosKeytab)
-        .setPrincipal(kerberosPrincipal)
-        .setDistributeKeytab(distributeKerberosKeytab)
-        .setConfPairs(configPairs);
-
-    super.updateParameters(parametersHolder, clientContext);
-  }
-
-  abstract void executePostOperations(ClientContext clientContext)
-      throws IOException;
-
-  void setDefaultDirs(ClientContext clientContext) throws IOException {
-    // Create directories if needed
-    String jobDir = getCheckpointPath();
-    if (jobDir == null) {
-      jobDir = getJobDir(clientContext);
-      setCheckpointPath(jobDir);
-    }
-
-    if (getNumWorkers() > 0) {
-      String savedModelDir = getSavedModelPath();
-      if (savedModelDir == null) {
-        savedModelDir = jobDir;
-        setSavedModelPath(savedModelDir);
-      }
-    }
-  }
-
-  private String getJobDir(ClientContext clientContext) throws IOException {
-    RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
-    if (getNumWorkers() > 0) {
-      return rdm.getJobCheckpointDir(getName(), true).toString();
-    } else {
-      // when #workers == 0, it means we only launch TB. In that case,
-      // point job dir to root dir so all job's metrics will be shown.
-      return rdm.getUserRootFolder().toString();
-    }
-  }
-
-  public abstract List<String> getLaunchCommands();
-
-  public String getInputPath() {
-    return input;
-  }
-
-  public RunJobParameters setInputPath(String input) {
-    this.input = input;
-    return this;
-  }
-
-  public String getCheckpointPath() {
-    return checkpointPath;
-  }
-
-  public RunJobParameters setCheckpointPath(String checkpointPath) {
-    this.checkpointPath = checkpointPath;
-    return this;
-  }
-
-  public boolean isWaitJobFinish() {
-    return waitJobFinish;
-  }
-
-  public RunJobParameters setWaitJobFinish(boolean waitJobFinish) {
-    this.waitJobFinish = waitJobFinish;
-    return this;
-  }
-
-  public List<Quicklink> getQuicklinks() {
-    return quicklinks;
-  }
-
-  public RunJobParameters setQuicklinks(List<Quicklink> quicklinks) {
-    this.quicklinks = quicklinks;
-    return this;
-  }
-
-  public List<Localization> getLocalizations() {
-    return localizations;
-  }
-
-  public RunJobParameters setLocalizations(List<Localization> localizations) {
-    this.localizations = localizations;
-    return this;
-  }
-
-  public String getKeytab() {
-    return keytab;
-  }
-
-  public RunJobParameters setKeytab(String kerberosKeytab) {
-    this.keytab = kerberosKeytab;
-    return this;
-  }
-
-  public String getPrincipal() {
-    return principal;
-  }
-
-  public RunJobParameters setPrincipal(String kerberosPrincipal) {
-    this.principal = kerberosPrincipal;
-    return this;
-  }
-
-  public boolean isSecurityDisabled() {
-    return securityDisabled;
-  }
-
-  public RunJobParameters setSecurityDisabled(boolean securityDisabled) {
-    this.securityDisabled = securityDisabled;
-    return this;
-  }
-
-  public boolean isDistributeKeytab() {
-    return distributeKeytab;
-  }
-
-  public RunJobParameters setDistributeKeytab(
-      boolean distributeKerberosKeytab) {
-    this.distributeKeytab = distributeKerberosKeytab;
-    return this;
-  }
-
-  public List<String> getConfPairs() {
-    return confPairs;
-  }
-
-  public RunJobParameters setConfPairs(List<String> confPairs) {
-    this.confPairs = confPairs;
-    return this;
-  }
-
-  public RunJobParameters setDistributed(boolean distributed) {
-    this.distributed = distributed;
-    return this;
-  }
-
-  RoleParameters generateWorkerParameters(ClientContext clientContext,
-      Parameter parametersHolder, String input)
-      throws ParseException, YarnException, IOException {
-    int nWorkers = getNumberOfWorkers(parametersHolder, input);
-    Resource workerResource =
-        determineWorkerResource(parametersHolder, nWorkers, clientContext);
-    String workerDockerImage =
-        parametersHolder.getOptionValue(CliConstants.WORKER_DOCKER_IMAGE);
-    String workerLaunchCmd =
-        parametersHolder.getOptionValue(CliConstants.WORKER_LAUNCH_CMD);
-    return new RoleParameters(TensorFlowRole.WORKER, nWorkers,
-        workerLaunchCmd, workerDockerImage, workerResource);
-  }
-
-  private Resource determineWorkerResource(Parameter parametersHolder,
-      int nWorkers, ClientContext clientContext)
-      throws ParseException, YarnException, IOException {
-    if (nWorkers > 0) {
-      String workerResourceStr =
-          parametersHolder.getOptionValue(CliConstants.WORKER_RES);
-      if (workerResourceStr == null) {
-        throw new ParseException(
-            "--" + CliConstants.WORKER_RES + " is absent.");
-      }
-      return ResourceUtils.createResourceFromString(workerResourceStr);
-    }
-    return null;
-  }
-
-  private int getNumberOfWorkers(Parameter parametersHolder,
-      String input) throws ParseException, YarnException {
-    int nWorkers = 1;
-    if (parametersHolder.getOptionValue(CliConstants.N_WORKERS) != null) {
-      nWorkers = Integer
-          .parseInt(parametersHolder.getOptionValue(CliConstants.N_WORKERS));
-      // Only check null value.
-      // Training job shouldn't ignore INPUT_PATH option
-      // But if nWorkers is 0, INPUT_PATH can be ignored because
-      // user can only run Tensorboard
-      if (null == input && 0 != nWorkers) {
-        throw new ParseException(
-            "\"--" + CliConstants.INPUT_PATH + "\" is absent");
-      }
-    }
-    return nWorkers;
-  }
-
-  public RoleParameters getWorkerParameter() {
-    return workerParameters;
-  }
-
-  public RunJobParameters setWorkerParameter(RoleParameters workerParameters) {
-    this.workerParameters = workerParameters;
-    return this;
-  }
-
-  public String getWorkerLaunchCmd() {
-    return workerParameters.getLaunchCommand();
-  }
-
-  public void setWorkerLaunchCmd(String launchCmd) {
-    workerParameters.setLaunchCommand(launchCmd);
-  }
-
-  public int getNumWorkers() {
-    return workerParameters.getReplicas();
-  }
-
-  public void setNumWorkers(int numWorkers) {
-    workerParameters.setReplicas(numWorkers);
-  }
-
-  public Resource getWorkerResource() {
-    return workerParameters.getResource();
-  }
-
-  public void setWorkerResource(Resource resource) {
-    workerParameters.setResource(resource);
-  }
-
-  public String getWorkerDockerImage() {
-    return workerParameters.getDockerImage();
-  }
-
-  public void setWorkerDockerImage(String image) {
-    workerParameters.setDockerImage(image);
-  }
-
-  public boolean isDistributed() {
-    return distributed;
-  }
-
-  @VisibleForTesting
-  public static class UnderscoreConverterPropertyUtils extends PropertyUtils {
-    @Override
-    public Property getProperty(Class<? extends Object> type, String name) {
-      if (name.indexOf('_') > -1) {
-        name = convertName(name);
-      }
-      return super.getProperty(type, name);
-    }
-
-    private static String convertName(String name) {
-      return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name);
-    }
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
deleted file mode 100644
index 925c3f3..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.api.TensorFlowRole;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Parameters for TensorFlow job.
- */
-public class TensorFlowRunJobParameters extends RunJobParameters {
-  private boolean tensorboardEnabled;
-  private static final String CANNOT_BE_DEFINED_FOR_TF =
-      "cannot be defined for TensorFlow jobs!";
-  private RoleParameters psParameters =
-      RoleParameters.createEmpty(TensorFlowRole.PS);
-  private RoleParameters tensorBoardParameters =
-      RoleParameters.createEmpty(TensorFlowRole.TENSORBOARD);
-
-  @Override
-  public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
-      throws ParseException, IOException, YarnException {
-    checkArguments(parametersHolder);
-    super.updateParameters(parametersHolder, clientContext);
-
-    String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
-    this.workerParameters =
-        generateWorkerParameters(clientContext, parametersHolder, input);
-    this.psParameters = getPSParameters(clientContext, parametersHolder);
-    this.distributed = determineIfDistributed(workerParameters.getReplicas(),
-        psParameters.getReplicas());
-
-    if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
-      this.tensorboardEnabled = true;
-      this.tensorBoardParameters =
-          getTensorBoardParameters(parametersHolder, clientContext);
-    }
-    executePostOperations(clientContext);
-  }
-
-  @Override
-  void executePostOperations(ClientContext clientContext) throws IOException {
-    // Set default job dir / saved model dir, etc.
-    setDefaultDirs(clientContext);
-    replacePatternsInParameters(clientContext);
-  }
-
-  private void checkArguments(Parameter parametersHolder)
-      throws YarnException, ParseException {
-    if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.N_SCHEDULERS));
-    } else if (parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_RES));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_DOCKER_IMAGE));
-    } else if (parametersHolder
-        .getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD) != null) {
-      throw new ParseException(getParamCannotBeDefinedErrorMessage(
-          CliConstants.SCHEDULER_LAUNCH_CMD));
-    }
-  }
-
-  private String getParamCannotBeDefinedErrorMessage(String cliName) {
-    return String.format(
-        "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_TF, cliName);
-  }
-
-  private void replacePatternsInParameters(ClientContext clientContext)
-      throws IOException {
-    if (StringUtils.isNotEmpty(getPSLaunchCmd())) {
-      String afterReplace = CliUtils.replacePatternsInLaunchCommand(
-          getPSLaunchCmd(), this, clientContext.getRemoteDirectoryManager());
-      setPSLaunchCmd(afterReplace);
-    }
-
-    if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
-      String afterReplace =
-          CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
-              clientContext.getRemoteDirectoryManager());
-      setWorkerLaunchCmd(afterReplace);
-    }
-  }
-
-  @Override
-  public List<String> getLaunchCommands() {
-    return Lists.newArrayList(getWorkerLaunchCmd(), getPSLaunchCmd());
-  }
-
-  private boolean determineIfDistributed(int nWorkers, int nPS)
-      throws ParseException {
-    // Check #workers and #ps.
-    // When distributed training is required
-    if (nWorkers >= 2 && nPS > 0) {
-      return true;
-    } else if (nWorkers <= 1 && nPS > 0) {
-      throw new ParseException("Only specified one worker but non-zero PS, "
-          + "please double check.");
-    }
-    return false;
-  }
-
-  private RoleParameters getPSParameters(ClientContext clientContext,
-      Parameter parametersHolder)
-      throws YarnException, IOException, ParseException {
-    int nPS = getNumberOfPS(parametersHolder);
-    Resource psResource =
-        determinePSResource(parametersHolder, nPS, clientContext);
-    String psDockerImage =
-        parametersHolder.getOptionValue(CliConstants.PS_DOCKER_IMAGE);
-    String psLaunchCommand =
-        parametersHolder.getOptionValue(CliConstants.PS_LAUNCH_CMD);
-    return new RoleParameters(TensorFlowRole.PS, nPS, psLaunchCommand,
-        psDockerImage, psResource);
-  }
-
-  private Resource determinePSResource(Parameter parametersHolder,
-      int nPS, ClientContext clientContext)
-      throws ParseException, YarnException, IOException {
-    if (nPS > 0) {
-      String psResourceStr =
-          parametersHolder.getOptionValue(CliConstants.PS_RES);
-      if (psResourceStr == null) {
-        throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
-      }
-      return ResourceUtils.createResourceFromString(psResourceStr);
-    }
-    return null;
-  }
-
-  private int getNumberOfPS(Parameter parametersHolder)
-      throws YarnException {
-    int nPS = 0;
-    if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
-      nPS =
-          Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_PS));
-    }
-    return nPS;
-  }
-
-  private RoleParameters getTensorBoardParameters(Parameter parametersHolder,
-      ClientContext clientContext) throws YarnException, IOException {
-    String tensorboardResourceStr =
-        parametersHolder.getOptionValue(CliConstants.TENSORBOARD_RESOURCES);
-    if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) {
-      tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES;
-    }
-    Resource tensorboardResource = ResourceUtils.createResourceFromString(
-            tensorboardResourceStr);
-    String tensorboardDockerImage =
-        parametersHolder.getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE);
-    return new RoleParameters(TensorFlowRole.TENSORBOARD, 1, null,
-        tensorboardDockerImage, tensorboardResource);
-  }
-
-  public RoleParameters getPsParameters() {
-    return psParameters;
-  }
-
-  public void setPsParameters(RoleParameters parameters) {
-    this.psParameters = parameters;
-  }
-
-  public int getNumPS() {
-    return psParameters.getReplicas();
-  }
-
-  public void setNumPS(int numPS) {
-    psParameters.setReplicas(numPS);
-  }
-
-  public Resource getPsResource() {
-    return psParameters.getResource();
-  }
-
-  public void setPsResource(Resource resource) {
-    psParameters.setResource(resource);
-  }
-
-  public String getPsDockerImage() {
-    return psParameters.getDockerImage();
-  }
-
-  public void setPsDockerImage(String image) {
-    psParameters.setDockerImage(image);
-  }
-
-  public String getPSLaunchCmd() {
-    return psParameters.getLaunchCommand();
-  }
-
-  public void setPSLaunchCmd(String launchCmd) {
-    psParameters.setLaunchCommand(launchCmd);
-  }
-
-  public RoleParameters getTensorBoardParameters() {
-    return tensorBoardParameters;
-  }
-
-  public void setTensorBoardParameters(RoleParameters tensorBoardParameters) {
-    this.tensorBoardParameters = tensorBoardParameters;
-  }
-
-  public boolean isTensorboardEnabled() {
-    return tensorboardEnabled;
-  }
-
-  public void setTensorboardEnabled(boolean tensorboardEnabled) {
-    this.tensorboardEnabled = tensorboardEnabled;
-  }
-
-  public Resource getTensorboardResource() {
-    return tensorBoardParameters.getResource();
-  }
-
-  public void setTensorboardResource(Resource resource) {
-    tensorBoardParameters.setResource(resource);
-  }
-
-  public String getTensorboardDockerImage() {
-    return tensorBoardParameters.getDockerImage();
-  }
-
-  public void setTensorboardDockerImage(String image) {
-    tensorBoardParameters.setDockerImage(image);
-  }
-
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
deleted file mode 100644
index fe15d29..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.commons.runtime.api.Role;
-
-/**
- * This class encapsulates data related to a particular Role.
- * Some examples: TF Worker process, TF PS process or a PyTorch worker process.
- */
-public class RoleParameters {
-  private final Role role;
-  private int replicas;
-  private String launchCommand;
-  private String dockerImage;
-  private Resource resource;
-
-  public RoleParameters(Role role, int replicas,
-      String launchCommand, String dockerImage, Resource resource) {
-    this.role = role;
-    this.replicas = replicas;
-    this.launchCommand = launchCommand;
-    this.dockerImage = dockerImage;
-    this.resource = resource;
-  }
-
-  public static RoleParameters createEmpty(Role role) {
-    return new RoleParameters(role, 0, null, null, null);
-  }
-
-  public Role getRole() {
-    return role;
-  }
-
-  public int getReplicas() {
-    return replicas;
-  }
-
-  public String getLaunchCommand() {
-    return launchCommand;
-  }
-
-  public RoleParameters setLaunchCommand(String launchCommand) {
-    this.launchCommand = launchCommand;
-    return this;
-  }
-
-  public String getDockerImage() {
-    return dockerImage;
-  }
-
-  public RoleParameters setDockerImage(String dockerImage) {
-    this.dockerImage = dockerImage;
-    return this;
-  }
-
-  public Resource getResource() {
-    return resource;
-  }
-
-  public RoleParameters setResource(Resource resource) {
-    this.resource = resource;
-    return this;
-  }
-
-  public RoleParameters setReplicas(int replicas) {
-    this.replicas = replicas;
-    return this;
-  }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
deleted file mode 100644
index 5c1bec0..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.AbstractCli;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.Command;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This purpose of this class is to handle / parse CLI arguments related to
- * the run job Submarine command.
- */
-public class RunJobCli extends AbstractCli {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RunJobCli.class);
-  private static final String CAN_BE_USED_WITH_TF_PYTORCH_MXNET =
-          "Can be used with TensorFlow or PyTorch or MXNet frameworks.";
-  private static final String CAN_BE_USED_WITH_TF_MXNET =
-      "Can be used with TensorFlow or MXNet frameworks.";
-  private static final String CAN_BE_USED_WITH_TF_ONLY =
-      "Can only be used with TensorFlow framework.";
-  private static final String CAN_BE_USED_WITH_MXNET_ONLY =
-          "Can only be used with MXNet framework.";
-  public static final String YAML_PARSE_FAILED = "Failed to parse " +
-      "YAML config";
-
-
-  private Options options;
-  private JobSubmitter jobSubmitter;
-  private JobMonitor jobMonitor;
-  private ParametersHolder parametersHolder;
-
-  public RunJobCli(ClientContext cliContext) {
-    this(cliContext, cliContext.getRuntimeFactory().getJobSubmitterInstance(),
-        cliContext.getRuntimeFactory().getJobMonitorInstance());
-  }
-
-  @VisibleForTesting
-  public RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
-      JobMonitor jobMonitor) {
-    super(cliContext);
-    this.options = generateOptions();
-    this.jobSubmitter = jobSubmitter;
-    this.jobMonitor = jobMonitor;
-  }
-
-  public void printUsages() {
-    new HelpFormatter().printHelp("job run", options);
-  }
-
-  private Options generateOptions() {
-    Options options = new Options();
-    options.addOption(CliConstants.YAML_CONFIG, true,
-        "Config file (in YAML format)");
-    options.addOption(CliConstants.FRAMEWORK, true,
-        String.format("Framework to use. Valid values are: %s! " +
-                "The default framework is Tensorflow.",
-            Framework.getValues()));
-    options.addOption(CliConstants.NAME, true, "Name of the job");
-    options.addOption(CliConstants.INPUT_PATH, true,
-        "Input of the job, could be local or other FS directory");
-    options.addOption(CliConstants.CHECKPOINT_PATH, true,
-        "Training output directory of the job, "
-            + "could be local or other FS directory. This typically includes "
-            + "checkpoint files and exported model ");
-    options.addOption(CliConstants.SAVED_MODEL_PATH, true,
-        "Model exported path (savedmodel) of the job, which is needed when "
-            + "exported model is not placed under ${checkpoint_path}"
-            + "could be local or other FS directory. " +
-            "This will be used to serve.");
-    options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
-    options.addOption(CliConstants.QUEUE, true,
-        "Name of queue to run the job, by default it uses default queue");
-
-    addWorkerOptions(options);
-    addPSOptions(options);
-    addSchedulerOptions(options);
-    addTensorboardOptions(options);
-
-    options.addOption(CliConstants.ENV, true,
-        "Common environment variable of worker/ps");
-    options.addOption(CliConstants.VERBOSE, false,
-        "Print verbose log for troubleshooting");
-    options.addOption(CliConstants.WAIT_JOB_FINISH, false,
-        "Specified when user want to wait the job finish");
-    options.addOption(CliConstants.QUICKLINK, true, "Specify quicklink so YARN"
-        + "web UI shows link to given role instance and port. When "
-        + "--tensorboard is specified, quicklink to tensorboard instance will "
-        + "be added automatically. The format of quick link is: "
-        + "Quick_link_label=http(or https)://role-name:port. For example, "
-        + "if want to link to first worker's 7070 port, and text of quicklink "
-        + "is Notebook_UI, user need to specify --quicklink "
-        + "Notebook_UI=https://master-0:7070");
-    options.addOption(CliConstants.LOCALIZATION, true, "Specify"
-        + " localization to make remote/local file/directory available to"
-        + " all container(Docker)."
-        + " Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro"
-        + " permission is not supported yet)"
-        + " The RemoteUri can be a file or directory in local or"
-        + " HDFS or s3 or abfs or http .etc."
-        + " The LocalFilePath can be absolute or relative."
-        + " If it's a relative path, it'll be"
-        + " under container's implied working directory"
-        + " but sub directory is not supported yet."
-        + " This option can be set multiple times."
-        + " Examples are \n"
-        + "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n"
-        + "-localization \"s3a:///a/b/myfile1:./\"\n"
-        + "-localization \"https:///a/b/myfile2:./myfile\"\n"
-        + "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n"
-        + "-localization \"./mydir1:.\"\n");
-    options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
-        "job under security environment");
-    options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
-        "by the job under security environment");
-    options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " +
-        "local keytab to cluster machines for service authentication. If not " +
-        "specified, pre-distributed keytab of which path specified by" +
-        " parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
-        "used");
-    options.addOption("h", "help", false, "Print help");
-    options.addOption("insecure", false, "Cluster is not Kerberos enabled.");
-    options.addOption("conf", true,
-        "User specified configuration, as key=val pairs.");
-    return options;
-  }
-
-  private void addWorkerOptions(Options options) {
-    options.addOption(CliConstants.N_WORKERS, true,
-        "Number of worker tasks of the job, by default it's 1." +
-            CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
-    options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
-        "Specify docker image for WORKER, when this is not specified, WORKER "
-            + "uses --" + CliConstants.DOCKER_IMAGE + " as default." +
-            CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
-    options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
-        "Commandline of worker, arguments will be "
-            + "directly used to launch the worker" +
-            CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
-    options.addOption(CliConstants.WORKER_RES, true,
-        "Resource of each worker, for example "
-            + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
-            CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
-  }
-
-  private void addPSOptions(Options options) {
-    options.addOption(CliConstants.N_PS, true,
-        "Number of PS tasks of the job, by default it's 0. " +
-            CAN_BE_USED_WITH_TF_MXNET);
-    options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
-        "Specify docker image for PS, when this is not specified, PS uses --"
-            + CliConstants.DOCKER_IMAGE + " as default." +
-            CAN_BE_USED_WITH_TF_MXNET);
-    options.addOption(CliConstants.PS_LAUNCH_CMD, true,
-        "Commandline of PS, arguments will be "
-            + "directly used to launch the PS" +
-            CAN_BE_USED_WITH_TF_MXNET);
-    options.addOption(CliConstants.PS_RES, true,
-        "Resource of each PS, for example "
-            + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
-            CAN_BE_USED_WITH_TF_MXNET);
-  }
-
-  private void addTensorboardOptions(Options options) {
-    options.addOption(CliConstants.TENSORBOARD, false,
-        "Should we run TensorBoard"
-            + " for this job? By default it's disabled." +
-            CAN_BE_USED_WITH_TF_ONLY);
-    options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
-        "Specify resources of Tensorboard, by default it is "
-            + CliConstants.TENSORBOARD_DEFAULT_RESOURCES + "." +
-            CAN_BE_USED_WITH_TF_ONLY);
-    options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
-        "Specify Tensorboard docker image. when this is not "
-            + "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE
-            + " as default." +
-            CAN_BE_USED_WITH_TF_ONLY);
-  }
-
-  private void addSchedulerOptions(Options options) {
-    options.addOption(CliConstants.N_SCHEDULERS, true,
-        "Number of scheduler tasks of the job. " +
-        "It should be 1 or 0, by default it's 0."+
-        CAN_BE_USED_WITH_MXNET_ONLY);
-    options.addOption(CliConstants.SCHEDULER_DOCKER_IMAGE, true,
-        "Specify docker image for scheduler, when this is not specified, " +
-        "scheduler uses --" + CliConstants.DOCKER_IMAGE +
-        " as default. " + CAN_BE_USED_WITH_MXNET_ONLY);
-    options.addOption(CliConstants.SCHEDULER_LAUNCH_CMD, true,
-        "Commandline of scheduler, arguments will be " +
-        "directly used to launch the scheduler. " + CAN_BE_USED_WITH_MXNET_ONLY);
-    options.addOption(CliConstants.SCHEDULER_RES, true,
-        "Resource of each scheduler, for example " +
-        "memory-mb=2048,vcores=2. " + CAN_BE_USED_WITH_MXNET_ONLY);
-  }
-
-  private void parseCommandLineAndGetRunJobParameters(String[] args)
-      throws ParseException, IOException, YarnException {
-    try {
-      GnuParser parser = new GnuParser();
-      CommandLine cli = parser.parse(options, args);
-      parametersHolder = createParametersHolder(cli);
-      parametersHolder.updateParameters(clientContext);
-    } catch (ParseException e) {
-      LOG.error("Exception in parse: {}", e.getMessage());
-      printUsages();
-      throw e;
-    }
-  }
-
-  private ParametersHolder createParametersHolder(CommandLine cli)
-      throws ParseException, YarnException {
-    String yamlConfigFile =
-        cli.getOptionValue(CliConstants.YAML_CONFIG);
-    if (yamlConfigFile != null) {
-      YamlConfigFile yamlConfig = readYamlConfigFile(yamlConfigFile);
-      checkYamlConfig(yamlConfigFile, yamlConfig);
-      LOG.info("Using YAML configuration!");
-      return ParametersHolder.createWithCmdLineAndYaml(cli, yamlConfig,
-          Command.RUN_JOB);
-    } else {
-      LOG.info("Using CLI configuration!");
-      return ParametersHolder.createWithCmdLine(cli, Command.RUN_JOB);
-    }
-  }
-
-  private void checkYamlConfig(String yamlConfigFile,
-      YamlConfigFile yamlConfig) {
-    if (yamlConfig == null) {
-      throw new YamlParseException(String.format(
-          YAML_PARSE_FAILED + ", file is empty: %s", yamlConfigFile));
-    } else if (yamlConfig.getConfigs() == null) {
-      throw new YamlParseException(String.format(YAML_PARSE_FAILED +
-          ", config section should be defined, but it cannot be found in " +
-          "YAML file '%s'!", yamlConfigFile));
-    }
-  }
-
-  private YamlConfigFile readYamlConfigFile(String filename) {
-    Constructor constructor = new Constructor(YamlConfigFile.class);
-    constructor.setPropertyUtils(new RunJobParameters.UnderscoreConverterPropertyUtils());
-    try {
-      LOG.info("Reading YAML configuration from file: {}", filename);
-      Yaml yaml = new Yaml(constructor);
-      return yaml.loadAs(FileUtils.openInputStream(new File(filename)),
-          YamlConfigFile.class);
-    } catch (FileNotFoundException e) {
-      logExceptionOfYamlParse(filename, e);
-      throw new YamlParseException(YAML_PARSE_FAILED +
-          ", file does not exist!");
-    } catch (Exception e) {
-      logExceptionOfYamlParse(filename, e);
-      throw new YamlParseException(
-          String.format(YAML_PARSE_FAILED + ", details: %s", e.getMessage()));
-    }
-  }
-
-  private void logExceptionOfYamlParse(String filename, Exception e) {
-    LOG.error(String.format("Exception while parsing YAML file %s", filename),
-        e);
-  }
-
-  private void storeJobInformation(RunJobParameters parameters,
-      ApplicationId applicationId, String[] args) throws IOException {
-    String jobName = parameters.getName();
-    Map<String, String> jobInfo = new HashMap<>();
-    jobInfo.put(StorageKeyConstants.JOB_NAME, jobName);
-    jobInfo.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
-
-    if (parameters.getCheckpointPath() != null) {
-      jobInfo.put(StorageKeyConstants.CHECKPOINT_PATH,
-          parameters.getCheckpointPath());
-    }
-    if (parameters.getInputPath() != null) {
-      jobInfo.put(StorageKeyConstants.INPUT_PATH,
-          parameters.getInputPath());
-    }
-    if (parameters.getSavedModelPath() != null) {
-      jobInfo.put(StorageKeyConstants.SAVED_MODEL_PATH,
-          parameters.getSavedModelPath());
-    }
-
-    String joinedArgs = String.join(" ", args);
-    jobInfo.put(StorageKeyConstants.JOB_RUN_ARGS, joinedArgs);
-    clientContext.getRuntimeFactory().getSubmarineStorage().addNewJob(jobName,
-        jobInfo);
-  }
-
-  @Override
-  public int run(String[] args)
-      throws ParseException, IOException, YarnException, SubmarineException {
-    if (CliUtils.argsForHelp(args)) {
-      printUsages();
-      return 0;
-    }
-
-    parseCommandLineAndGetRunJobParameters(args);
-    ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
-    LOG.info("Submarine job is submitted, the job id is " + applicationId);
-    RunJobParameters parameters =
-        (RunJobParameters) parametersHolder.getParameters();
-    storeJobInformation(parameters, applicationId, args);
-    if (parameters.isWaitJobFinish()) {
-      this.jobMonitor.waitTrainingFinal(parameters.getName());
-    }
-
-    return 0;
-  }
-
-  @VisibleForTesting
-  public JobSubmitter getJobSubmitter() {
-    return jobSubmitter;
-  }
-
-  @VisibleForTesting
-  public RunJobParameters getRunJobParameters() {
-    return (RunJobParameters) parametersHolder.getParameters();
-  }
-
-  @VisibleForTesting
-  public ParametersHolder getParametersHolder() {
-    return parametersHolder;
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java
deleted file mode 100644
index 9bbbbe3..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ShowJobParameters;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.RuntimeFactory;
-import org.apache.submarine.commons.runtime.fs.MemorySubmarineStorage;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ShowJobCliParsingTest {
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Test
-  public void testPrintHelp() throws IOException {
-    MockClientContext mockClientContext = new MockClientContext("testJob");
-    ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
-    showJobCli.printUsages();
-  }
-
-  @Test
-  public void testShowJob()
-      throws InterruptedException, SubmarineException, YarnException,
-      ParseException, IOException {
-    MockClientContext mockClientContext = new MockClientContext("testJob");
-    ShowJobCli showJobCli = new ShowJobCli(mockClientContext) {
-      @Override
-      protected void getAndPrintJobInfo() {
-        // do nothing
-      }
-    };
-    showJobCli.run(new String[] { "--name", "my-job" });
-    ShowJobParameters parameters = showJobCli.getParameters();
-    Assert.assertEquals(parameters.getName(), "my-job");
-  }
-
-  private Map<String, String> getMockJobInfo(String jobName) {
-    Map<String, String> map = new HashMap<>();
-    map.put(StorageKeyConstants.APPLICATION_ID,
-        ApplicationId.newInstance(1234L, 1).toString());
-    map.put(StorageKeyConstants.JOB_RUN_ARGS, "job run 123456");
-    map.put(StorageKeyConstants.INPUT_PATH, "hdfs://" + jobName);
-    return map;
-  }
-
-  @Test
-  public void testSimpleShowJob()
-      throws InterruptedException, SubmarineException, YarnException,
-      ParseException, IOException {
-    SubmarineStorage storage = new MemorySubmarineStorage();
-    MockClientContext mockClientContext = new MockClientContext("testJob");
-    RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
-    when(runtimeFactory.getSubmarineStorage()).thenReturn(storage);
-    mockClientContext.setRuntimeFactory(runtimeFactory);
-
-    ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
-
-    try {
-      showJobCli.run(new String[] { "--name", "my-job" });
-    } catch (IOException e) {
-      // expected
-    }
-
-
-    storage.addNewJob("my-job", getMockJobInfo("my-job"));
-    showJobCli.run(new String[] { "--name", "my-job" });
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java
deleted file mode 100644
index c639ba6..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Test utility class for test code that deals with YAML configuration parsing.
- */
-public final class YamlConfigTestUtils {
-
-  private YamlConfigTestUtils() {}
-
-  public static void deleteFile(File file) {
-    if (file != null) {
-      file.delete();
-    }
-  }
-
-  public static YamlConfigFile readYamlConfigFile(String filename) {
-    Constructor constructor = new Constructor(YamlConfigFile.class);
-    constructor.setPropertyUtils(new RunJobParameters.UnderscoreConverterPropertyUtils());
-    Yaml yaml = new Yaml(constructor);
-    InputStream inputStream = YamlConfigTestUtils.class
-        .getClassLoader()
-        .getResourceAsStream(filename);
-    return yaml.loadAs(inputStream, YamlConfigFile.class);
-  }
-
-  public static File createTempFileWithContents(String filename)
-      throws IOException {
-    InputStream inputStream = YamlConfigTestUtils.class
-        .getClassLoader()
-        .getResourceAsStream(filename);
-    File targetFile = File.createTempFile("test", ".yaml");
-    FileUtils.copyInputStreamToFile(inputStream, targetFile);
-    return targetFile;
-  }
-
-  public static File createEmptyTempFile() throws IOException {
-    return File.createTempFile("test", ".yaml");
-  }
-
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java
deleted file mode 100644
index f75e2b8..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.RuntimeFactory;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import java.io.IOException;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * This class contains some test methods to test common functionality
- * (including TF / PyTorch) of the run job Submarine command.
- */
-public class RunJobCliParsingCommonTest {
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  public static MockClientContext getMockClientContext()
-          throws IOException, YarnException, SubmarineException {
-    MockClientContext mockClientContext = new MockClientContext("testJob");
-    JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
-    when(mockJobSubmitter.submitJob(any(ParametersHolder.class)))
-        .thenReturn(ApplicationId.newInstance(1235L, 1));
-
-    JobMonitor mockJobMonitor = mock(JobMonitor.class);
-    SubmarineStorage storage = mock(SubmarineStorage.class);
-    RuntimeFactory rtFactory = mock(RuntimeFactory.class);
-
-    when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter);
-    when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor);
-    when(rtFactory.getSubmarineStorage()).thenReturn(storage);
-
-    mockClientContext.setRuntimeFactory(rtFactory);
-    return mockClientContext;
-  }
-
-  @Test
-  public void testAbsentFrameworkFallsBackToTensorFlow() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    runJobCli.run(
-        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path",
-            "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
-            "true", "--verbose", "--wait_job_finish"});
-    RunJobParameters runJobParameters = runJobCli.getRunJobParameters();
-    assertTrue("Default Framework should be TensorFlow!",
-        runJobParameters instanceof TensorFlowRunJobParameters);
-  }
-
-  @Test
-  public void testEmptyFrameworkOption() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(MissingArgumentException.class);
-    expectedException.expectMessage("Missing argument for option: framework");
-
-    runJobCli.run(
-        new String[]{"--framework", "--name", "my-job",
-            "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path",
-            "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
-            "true", "--verbose", "--wait_job_finish"});
-  }
-
-  @Test
-  public void testInvalidFrameworkOption() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("Failed to parse Framework type");
-
-    runJobCli.run(
-        new String[]{"--framework", "bla", "--name", "my-job",
-            "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path",
-            "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
-            "true", "--verbose", "--wait_job_finish"});
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java
deleted file mode 100644
index c9d6a2d..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest.getMockClientContext;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class contains some test methods to test common YAML parsing
- * functionality (including TF / PyTorch) of the run job Submarine command.
- */
-public class RunJobCliParsingCommonYamlTest {
-  private static final String DIR_NAME = "runjob-common-yaml";
-  private static final String TF_DIR = "runjob-pytorch-yaml";
-  private File yamlConfig;
-  private static Logger LOG = LoggerFactory.getLogger(
-      RunJobCliParsingCommonYamlTest.class);
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @After
-  public void after() {
-    YamlConfigTestUtils.deleteFile(yamlConfig);
-  }
-
-  @BeforeClass
-  public static void configureResourceTypes() {
-    try {
-      ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
-    } catch (SubmarineRuntimeException e) {
-      LOG.info("The hadoop dependency doesn't support gpu resource, " +
-          "so just skip this test case.");
-    }
-  }
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  @Test
-  public void testYamlAndCliOptionIsDefinedIsInvalid() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        TF_DIR + "/valid-config.yaml");
-    String[] args = new String[] {"--name", "my-job",
-        "--docker_image", "tf-docker:1.1.0",
-        "-f", yamlConfig.getAbsolutePath() };
-
-    exception.expect(YarnException.class);
-    exception.expectMessage("defined both with YAML config and with " +
-        "CLI argument");
-
-    runJobCli.run(args);
-  }
-
-  @Test
-  public void testYamlAndCliOptionIsDefinedIsInvalidWithListOption()
-      throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        TF_DIR + "/valid-config.yaml");
-    String[] args = new String[] {"--name", "my-job",
-        "--quicklink", "AAA=http://master-0:8321",
-        "--quicklink", "BBB=http://worker-0:1234",
-        "-f", yamlConfig.getAbsolutePath()};
-
-    exception.expect(YarnException.class);
-    exception.expectMessage("defined both with YAML config and with " +
-        "CLI argument");
-
-    runJobCli.run(args);
-  }
-
-  @Test
-  public void testFalseValuesForBooleanFields() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/test-false-values.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertFalse(jobRunParameters.isDistributeKeytab());
-    assertFalse(jobRunParameters.isWaitJobFinish());
-    assertFalse(tensorFlowParams.isTensorboardEnabled());
-  }
-
-  @Test
-  public void testWrongIndentation() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/wrong-indentation.yaml");
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("Failed to parse YAML config, details:");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testWrongFilename() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    exception.expect(YamlParseException.class);
-    runJobCli.run(
-        new String[]{"-f", "not-existing", "--verbose"});
-  }
-
-  @Test
-  public void testEmptyFile() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createEmptyTempFile();
-
-    exception.expect(YamlParseException.class);
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testNotExistingFile() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("file does not exist");
-    runJobCli.run(
-        new String[]{"-f", "blabla", "--verbose"});
-  }
-
-  @Test
-  public void testWrongPropertyName() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/wrong-property-name.yaml");
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("Failed to parse YAML config, details:");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testMissingConfigsSection() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/missing-configs.yaml");
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("config section should be defined, " +
-        "but it cannot be found");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testMissingSectionsShouldParsed() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/some-sections-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-
-  @Test
-  public void testAbsentFramework() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/missing-framework.yaml");
-
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testEmptyFramework() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/empty-framework.yaml");
-
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testInvalidFramework() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/invalid-framework.yaml");
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("framework should is defined, " +
-        "but it has an invalid value");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java
deleted file mode 100644
index 5e5ab09..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest.getMockClientContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * This class contains some test methods to test common CLI parsing
- * functionality (including TF / PyTorch) of the run job Submarine command.
- */
-@RunWith(Parameterized.class)
-public class RunJobCliParsingParameterizedTest {
-
-  private final Framework framework;
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    Collection<Object[]> params = new ArrayList<>();
-    params.add(new Object[]{Framework.TENSORFLOW });
-    params.add(new Object[]{Framework.PYTORCH });
-    return params;
-  }
-
-  public RunJobCliParsingParameterizedTest(Framework framework) {
-    this.framework = framework;
-  }
-
-  private String getFrameworkName() {
-    return framework.getValue();
-  }
-
-  @Test
-  public void testPrintHelp() throws IOException {
-    MockClientContext mockClientContext = new MockClientContext("testJob");
-    JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
-    JobMonitor mockJobMonitor = mock(JobMonitor.class);
-    RunJobCli runJobCli = new RunJobCli(mockClientContext, mockJobSubmitter,
-        mockJobMonitor);
-    runJobCli.printUsages();
-  }
-
-  @Test
-  public void testNoInputPathOptionSpecified() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    String expectedErrorMessage = "\"--" + CliConstants.INPUT_PATH + "\"" +
-        " is absent";
-    String actualMessage = "";
-    try {
-      runJobCli.run(
-          new String[]{"--framework", getFrameworkName(),
-              "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-              "--checkpoint_path", "hdfs://output",
-              "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-              "--worker_resources", "memory=4g,vcores=2", "--verbose",
-              "--wait_job_finish"});
-    } catch (ParseException e) {
-      actualMessage = e.getMessage();
-      e.printStackTrace();
-    }
-    assertEquals(expectedErrorMessage, actualMessage);
-  }
-
-  @Test
-  public void testJobWithoutName() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    String expectedErrorMessage =
-        "--" + CliConstants.NAME + " is absent";
-    String actualMessage = "";
-    try {
-      runJobCli.run(
-          new String[]{"--framework", getFrameworkName(),
-              "--docker_image", "tf-docker:1.1.0",
-              "--num_workers", "0", "--verbose"});
-    } catch (ParseException e) {
-      actualMessage = e.getMessage();
-      e.printStackTrace();
-    }
-    assertEquals(expectedErrorMessage, actualMessage);
-  }
-
-  @Test
-  public void testLaunchCommandPatternReplace() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    List<String> parameters = Lists.newArrayList("--framework",
-        getFrameworkName(),
-        "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-        "--input_path", "hdfs://input", "--checkpoint_path",
-        "hdfs://output",
-        "--num_workers", "3",
-        "--worker_launch_cmd", "python run-job.py --input=%input_path% " +
-            "--model_dir=%checkpoint_path% " +
-            "--export_dir=%saved_model_path%/savedmodel",
-        "--worker_resources", "memory=2048,vcores=2");
-
-    if (framework == Framework.TENSORFLOW) {
-      parameters.addAll(Lists.newArrayList(
-          "--ps_resources", "memory=4096,vcores=4",
-          "--ps_launch_cmd", "python run-ps.py --input=%input_path% " +
-              "--model_dir=%checkpoint_path%/model",
-          "--verbose"));
-    }
-
-    runJobCli.run(parameters.toArray(new String[0]));
-
-    RunJobParameters runJobParameters = checkExpectedFrameworkParams(runJobCli);
-
-    if (framework == Framework.TENSORFLOW) {
-      TensorFlowRunJobParameters tensorFlowParams =
-          (TensorFlowRunJobParameters) runJobParameters;
-      assertEquals(
-          "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
-              + "--export_dir=hdfs://output/savedmodel",
-          tensorFlowParams.getWorkerLaunchCmd());
-      assertEquals(
-          "python run-ps.py --input=hdfs://input " +
-              "--model_dir=hdfs://output/model",
-          tensorFlowParams.getPSLaunchCmd());
-    } else if (framework == Framework.PYTORCH) {
-      PyTorchRunJobParameters pyTorchParameters =
-          (PyTorchRunJobParameters) runJobParameters;
-      assertEquals(
-          "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
-              + "--export_dir=hdfs://output/savedmodel",
-          pyTorchParameters.getWorkerLaunchCmd());
-    }
-  }
-
-  private RunJobParameters checkExpectedFrameworkParams(RunJobCli runJobCli) {
-    RunJobParameters runJobParameters = runJobCli.getRunJobParameters();
-
-    if (framework == Framework.TENSORFLOW) {
-      assertTrue(RunJobParameters.class + " must be an instance of " +
-              TensorFlowRunJobParameters.class,
-          runJobParameters instanceof TensorFlowRunJobParameters);
-    } else if (framework == Framework.PYTORCH) {
-      assertTrue(RunJobParameters.class + " must be an instance of " +
-              PyTorchRunJobParameters.class,
-          runJobParameters instanceof PyTorchRunJobParameters);
-    }
-    return runJobParameters;
-  }
-
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java
deleted file mode 100644
index c6c0678..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.mxnet;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of MXNet
- * CLI configuration parsing.
- */
-public class RunJobCliParsingMXNetTest {
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testBasicRunJobForSingleNodeTraining() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    runJobCli.run(
-        new String[]{"--framework", "mxnet", "--name", "my-job",
-            "--docker_image", "mxnet-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--num_workers", "1", "--num_ps", "1", "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_resources", "memory=4G,vcores=2", "--ps_launch_cmd",
-            "python run-ps.py", "--num_schedulers", "1", "--scheduler_launch_cmd",
-            "python run-scheduler.py", "--scheduler_resources", "memory=1024M,vcores=2",
-            "--verbose", "--wait_job_finish"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    assertTrue(RunJobParameters.class +
-        " must be an instance of " +
-        MXNetRunJobParameters.class,
-        jobRunParameters instanceof MXNetRunJobParameters);
-    MXNetRunJobParameters mxNetParams =
-        (MXNetRunJobParameters) jobRunParameters;
-
-    assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
-    assertEquals(mxNetParams.getNumWorkers(), 1);
-    assertEquals(mxNetParams.getWorkerLaunchCmd(), "python run-job.py");
-    assertEquals(Resources.createResource(2048, 2),
-        mxNetParams.getWorkerResource());
-    assertEquals(mxNetParams.getNumPS(), 1);
-    assertEquals(mxNetParams.getNumSchedulers(), 1);
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-  }
-
-  @Test
-  public void testBasicRunJobForDistributedTraining() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-    runJobCli.run(
-        new String[]{"--framework", "mxnet", "--name", "my-job",
-            "--docker_image", "mxnet-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--num_workers", "2", "--num_ps", "2", "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_resources", "memory=4G,vcores=2", "--ps_launch_cmd",
-            "python run-ps.py", "--num_schedulers", "1", "--scheduler_launch_cmd",
-            "python run-scheduler.py", "--scheduler_resources", "memory=1024M,vcores=2",
-            "--verbose"});
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    assertTrue(RunJobParameters.class +
-        " must be an instance of " +
-        MXNetRunJobParameters.class,
-        jobRunParameters instanceof MXNetRunJobParameters);
-    MXNetRunJobParameters mxNetParams =
-        (MXNetRunJobParameters) jobRunParameters;
-
-    assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
-    assertEquals(jobRunParameters.getDockerImageName(), "mxnet-docker:1.1.0");
-    assertEquals(mxNetParams.getNumWorkers(), 2);
-    assertEquals(Resources.createResource(2048, 2),
-        mxNetParams.getWorkerResource());
-    assertEquals(mxNetParams.getWorkerLaunchCmd(), "python run-job.py");
-    assertEquals(mxNetParams.getNumPS(), 2);
-    assertEquals(Resources.createResource(4096, 2),
-        mxNetParams.getPsResource());
-    assertEquals(mxNetParams.getPSLaunchCmd(), "python run-ps.py");
-    assertEquals(mxNetParams.getNumSchedulers(), 1);
-    assertEquals(Resources.createResource(1024, 2),
-        mxNetParams.getSchedulerResource());
-    assertEquals(mxNetParams.getSchedulerLaunchCmd(), "python run-scheduler.py");
-    assertTrue(SubmarineLogs.isVerbose());
-  }
-
-  @Test
-  public void testTensorboardCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for MXNet jobs");
-    runJobCli.run(
-        new String[]{"--framework", "mxnet",
-            "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--num_workers", "2",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard"});
-  }
-
-  @Test
-  public void testTensorboardResourcesCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for MXNet jobs");
-    runJobCli.run(
-        new String[]{"--framework", "mxnet",
-            "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--num_workers", "2",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard_resources", "memory=1024M,vcores=2"});
-  }
-
-  @Test
-  public void testTensorboardDockerImageCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for MXNet jobs");
-    runJobCli.run(
-        new String[]{"--framework", "mxnet",
-            "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--num_workers", "2",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard_docker_image", "TBDockerImage"});
-  }
-}
-
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java
deleted file mode 100644
index 0aa60f6..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.mxnet;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.junit.*;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Test class that verifies the correctness of MXNet
- * YAML configuration parsing.
- */
-public class RunJobCliParsingMXNetYamlTest {
-  private static final String OVERRIDDEN_PREFIX = "overridden_";
-  private static final String DIR_NAME = "runjob-mxnet-yaml";
-  private File yamlConfig;
-  private static Logger LOG = LoggerFactory.getLogger(
-    RunJobCliParsingMXNetYamlTest.class);
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @After
-  public void after() {
-    YamlConfigTestUtils.deleteFile(yamlConfig);
-  }
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
-    verifyBasicConfigValues(jobRunParameters,
-        ImmutableList.of("env1=env1Value", "env2=env2Value"));
-  }
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
-      List<String> expectedEnvs) {
-    assertEquals("testInputPath", jobRunParameters.getInputPath());
-    assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
-    assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
-    assertNotNull(jobRunParameters.getLocalizations());
-    assertEquals(2, jobRunParameters.getLocalizations().size());
-
-    assertNotNull(jobRunParameters.getQuicklinks());
-    assertEquals(2, jobRunParameters.getQuicklinks().size());
-
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-
-    for (String env : expectedEnvs) {
-      assertTrue(String.format(
-          "%s should be in env list of jobRunParameters!", env),
-          jobRunParameters.getEnvars().contains(env));
-    }
-  }
-
-  private void verifyPsValues(RunJobParameters jobRunParameters,
-      String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            MXNetRunJobParameters.class,
-        jobRunParameters instanceof MXNetRunJobParameters);
-    MXNetRunJobParameters mxNetParams =
-        (MXNetRunJobParameters) jobRunParameters;
-
-    assertEquals(4, mxNetParams.getNumPS());
-    assertEquals(prefix + "testLaunchCmdPs", mxNetParams.getPSLaunchCmd());
-    assertEquals(prefix + "testDockerImagePs",
-        mxNetParams.getPsDockerImage());
-    assertEquals(Resources.createResource(20500, 34),
-        mxNetParams.getPsResource());
-  }
-
-  private void verifySchedulerValues(RunJobParameters jobRunParameters,
-      String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-        MXNetRunJobParameters.class, jobRunParameters instanceof MXNetRunJobParameters);
-    MXNetRunJobParameters mxNetParams = (MXNetRunJobParameters) jobRunParameters;
-    assertEquals(1, mxNetParams.getNumSchedulers());
-    assertEquals(prefix + "testLaunchCmdScheduler",
-        mxNetParams.getSchedulerLaunchCmd());
-    assertEquals(prefix + "testDockerImageScheduler", mxNetParams.getSchedulerDockerImage());
-    assertEquals(Resources.createResource(10240, 16),
-        mxNetParams.getSchedulerResource());
-  }
-
-  private void verifyWorkerValues(RunJobParameters jobRunParameters, String prefix) {
-    MXNetRunJobParameters mxNetParams =
-        verifyWorkerCommonValues(jobRunParameters, prefix);
-    assertEquals(Resources.createResource(20480, 32),
-        mxNetParams.getWorkerResource());
-  }
-
-  private MXNetRunJobParameters verifyWorkerCommonValues(
-          RunJobParameters jobRunParameters, String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-                    MXNetRunJobParameters.class,
-            jobRunParameters instanceof MXNetRunJobParameters);
-    MXNetRunJobParameters mxNetParams =
-            (MXNetRunJobParameters) jobRunParameters;
-
-    assertEquals(3, mxNetParams.getNumWorkers());
-    assertEquals(prefix + "testLaunchCmdWorker",
-            mxNetParams.getWorkerLaunchCmd());
-    assertEquals(prefix + "testDockerImageWorker",
-            mxNetParams.getWorkerDockerImage());
-    return mxNetParams;
-  }
-
-  private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters, String prefix) {
-    MXNetRunJobParameters mxNetParams =
-        verifyWorkerCommonValues(jobRunParameters, prefix);
-    Resource workResource = Resources.createResource(20480, 32);
-    ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
-    assertEquals(workResource, mxNetParams.getWorkerResource());
-  }
-
-  private void verifySecurityValues(RunJobParameters jobRunParameters) {
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertEquals("testPrincipal", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  @Test
-  public void testValidYamlParsing() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifySchedulerValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testValidGpuYamlParsing() throws Exception {
-    try {
-      ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
-    } catch (SubmarineRuntimeException e) {
-      LOG.info("The hadoop dependency doesn't support gpu resource, " +
-          "so just skip this test case.");
-      return;
-    }
-
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-gpu-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifySchedulerValues(jobRunParameters, "");
-    verifyWorkerValuesWithGpu(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testRoleOverrides() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config-with-overrides.yaml");
-
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifySchedulerValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testMissingPrincipalUnderSecuritySection() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/security-principal-is-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifySchedulerValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-
-    //Verify security values
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertNull("Principal should be null!", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  @Test
-  public void testMissingEnvs() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/envs-are-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
-    verifyPsValues(jobRunParameters, "");
-    verifySchedulerValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testInvalidConfigTensorboardSectionIsDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("TensorBoard section should not be defined " +
-        "when TensorFlow is not the selected framework!");
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/invalid-config-tensorboard-section.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-}
\ No newline at end of file
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java
deleted file mode 100644
index b1ec19b..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.pytorch;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of PyTorch
- * CLI configuration parsing.
- */
-public class RunJobCliParsingPyTorchTest {
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testBasicRunJobForSingleNodeTraining() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path",
-            "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--verbose",
-            "--wait_job_finish"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    assertTrue(RunJobParameters.class +
-            " must be an instance of " +
-            PyTorchRunJobParameters.class,
-        jobRunParameters instanceof PyTorchRunJobParameters);
-    PyTorchRunJobParameters pyTorchParams =
-        (PyTorchRunJobParameters) jobRunParameters;
-
-    assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
-    assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
-    assertEquals(pyTorchParams.getNumWorkers(), 1);
-    assertEquals(pyTorchParams.getWorkerLaunchCmd(),
-        "python run-job.py");
-    assertEquals(Resources.createResource(4096, 2),
-        pyTorchParams.getWorkerResource());
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-  }
-
-  @Test
-  public void testNumPSCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--num_ps", "2"});
-  }
-
-  @Test
-  public void testPSResourcesCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_resources", "memory=2048M,vcores=2"});
-  }
-
-  @Test
-  public void testPSDockerImageCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_docker_image", "psDockerImage"});
-  }
-
-  @Test
-  public void testPSLaunchCommandCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_launch_cmd", "psLaunchCommand"});
-  }
-
-  @Test
-  public void testTensorboardCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard"});
-  }
-
-  @Test
-  public void testTensorboardResourcesCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard_resources", "memory=2048M,vcores=2"});
-  }
-
-  @Test
-  public void testTensorboardDockerImageCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--tensorboard_docker_image", "TBDockerImage"});
-  }
-
-  @Test
-  public void testNumSchedulerCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--num_schedulers", "1"});
-  }
-
-  @Test
-  public void testSchedulerResourcesCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--scheduler_resources", "memory=2048M,vcores=2"});
-  }
-
-  @Test
-  public void testSchedulerDockerImageCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--scheduler_docker_image", "schedulerDockerImage"});
-  }
-
-  @Test
-  public void testSchedulerLaunchCommandCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for PyTorch jobs");
-    runJobCli.run(
-        new String[]{"--framework", "pytorch",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3",
-            "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2048M,vcores=2",
-            "--scheduler_launch_cmd", "schedulerLaunchCommand"});
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java
deleted file mode 100644
index 2ab075f..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.pytorch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test class that verifies the correctness of PyTorch
- * YAML configuration parsing.
- */
-public class RunJobCliParsingPyTorchYamlTest {
-  private static final String OVERRIDDEN_PREFIX = "overridden_";
-  private static final String DIR_NAME = "runjob-pytorch-yaml";
-  private File yamlConfig;
-  private static Logger LOG = LoggerFactory.getLogger(
-      RunJobCliParsingPyTorchYamlTest.class);
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @After
-  public void after() {
-    YamlConfigTestUtils.deleteFile(yamlConfig);
-  }
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
-    verifyBasicConfigValues(jobRunParameters,
-        ImmutableList.of("env1=env1Value", "env2=env2Value"));
-  }
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
-      List<String> expectedEnvs) {
-    assertEquals("testInputPath", jobRunParameters.getInputPath());
-    assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
-    Assert.assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
-    assertNotNull(jobRunParameters.getLocalizations());
-    assertEquals(2, jobRunParameters.getLocalizations().size());
-
-    assertNotNull(jobRunParameters.getQuicklinks());
-    assertEquals(2, jobRunParameters.getQuicklinks().size());
-
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-
-    for (String env : expectedEnvs) {
-      assertTrue(String.format(
-          "%s should be in env list of jobRunParameters!", env),
-          jobRunParameters.getEnvars().contains(env));
-    }
-  }
-
-  private PyTorchRunJobParameters verifyWorkerCommonValues(RunJobParameters
-      jobRunParameters, String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            PyTorchRunJobParameters.class,
-        jobRunParameters instanceof PyTorchRunJobParameters);
-    PyTorchRunJobParameters pyTorchParams =
-        (PyTorchRunJobParameters) jobRunParameters;
-
-    assertEquals(3, pyTorchParams.getNumWorkers());
-    assertEquals(prefix + "testLaunchCmdWorker",
-        pyTorchParams.getWorkerLaunchCmd());
-    assertEquals(prefix + "testDockerImageWorker",
-        pyTorchParams.getWorkerDockerImage());
-    return pyTorchParams;
-  }
-
-  private void verifyWorkerValues(RunJobParameters jobRunParameters,
-      String prefix) {
-    PyTorchRunJobParameters pyTorchParams = verifyWorkerCommonValues
-        (jobRunParameters, prefix);
-    assertEquals(Resources.createResource(20480, 32),
-        pyTorchParams.getWorkerResource());
-  }
-
-  private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters,
-      String prefix) {
-
-    PyTorchRunJobParameters pyTorchParams = verifyWorkerCommonValues
-        (jobRunParameters, prefix);
-    Resource workResource = Resources.createResource(20480, 32);
-    ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
-    assertEquals(workResource, pyTorchParams.getWorkerResource());
-  }
-
-  private void verifySecurityValues(RunJobParameters jobRunParameters) {
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertEquals("testPrincipal", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  @Test
-  public void testValidYamlParsing() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testValidGpuYamlParsing() throws Exception {
-    try {
-      ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
-    } catch (SubmarineRuntimeException e) {
-      LOG.info("The hadoop dependency doesn't support gpu resource, " +
-          "so just skip this test case.");
-      return;
-    }
-
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-gpu-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyWorkerValuesWithGpu(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testRoleOverrides() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config-with-overrides.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testMissingPrincipalUnderSecuritySection() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/security-principal-is-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyWorkerValues(jobRunParameters, "");
-
-    //Verify security values
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertNull("Principal should be null!", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  @Test
-  public void testMissingEnvs() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/envs-are-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-  }
-
-  @Test
-  public void testInvalidConfigPsSectionIsDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("PS section should not be defined " +
-        "when PyTorch is the selected framework");
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/invalid-config-ps-section.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testInvalidConfigTensorboardSectionIsDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("TensorBoard section should not be defined " +
-        "when TensorFlow is not the selected framework!");
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/invalid-config-tensorboard-section.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-
-  @Test
-  public void testInvalidConfigSchedulerSectionIsDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("Scheduler section should not be defined " +
-        "when MXNet is not the selected framework!");
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-         DIR_NAME + "/invalid-config-scheduler-section.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java
deleted file mode 100644
index 1dfaf42..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of TensorFlow
- * CLI configuration parsing.
- */
-public class RunJobCliParsingTensorFlowTest {
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testNoInputPathOptionSpecified() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    String expectedErrorMessage = "\"--" + CliConstants.INPUT_PATH +
-        "\" is absent";
-    String actualMessage = "";
-    try {
-      runJobCli.run(
-          new String[]{"--framework", "tensorflow",
-              "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-              "--checkpoint_path", "hdfs://output",
-              "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-              "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
-              "true", "--verbose", "--wait_job_finish"});
-    } catch (ParseException e) {
-      actualMessage = e.getMessage();
-      e.printStackTrace();
-    }
-    assertEquals(expectedErrorMessage, actualMessage);
-  }
-
-  @Test
-  public void testBasicRunJobForDistributedTraining() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    assertFalse(SubmarineLogs.isVerbose());
-
-    runJobCli.run(
-        new String[] { "--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input",
-            "--checkpoint_path", "hdfs://output",
-            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
-            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
-            "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
-            "--ps_launch_cmd", "python run-ps.py", "--keytab", "/keytab/path",
-            "--principal", "user/_HOST@domain.com", "--distribute_keytab",
-            "--verbose" });
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    assertTrue(RunJobParameters.class +
-        " must be an instance of " +
-        TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
-    assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
-    assertEquals(tensorFlowParams.getNumPS(), 2);
-    assertEquals(tensorFlowParams.getPSLaunchCmd(), "python run-ps.py");
-    assertEquals(Resources.createResource(4096, 4),
-        tensorFlowParams.getPsResource());
-    assertEquals(tensorFlowParams.getWorkerLaunchCmd(),
-        "python run-job.py");
-    assertEquals(Resources.createResource(2048, 2),
-        tensorFlowParams.getWorkerResource());
-    assertEquals(jobRunParameters.getDockerImageName(),
-        "tf-docker:1.1.0");
-    assertEquals(jobRunParameters.getKeytab(),
-        "/keytab/path");
-    assertEquals(jobRunParameters.getPrincipal(),
-        "user/_HOST@domain.com");
-    assertTrue(jobRunParameters.isDistributeKeytab());
-    assertTrue(SubmarineLogs.isVerbose());
-  }
-
-  @Test
-  public void testBasicRunJobForSingleNodeTraining() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    runJobCli.run(
-        new String[] { "--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path",
-            "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
-            "true", "--verbose", "--wait_job_finish" });
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    assertTrue(RunJobParameters.class +
-            " must be an instance of " +
-            TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
-    assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
-    assertEquals(tensorFlowParams.getNumWorkers(), 1);
-    assertEquals(tensorFlowParams.getWorkerLaunchCmd(),
-        "python run-job.py");
-    assertEquals(Resources.createResource(4096, 2),
-        tensorFlowParams.getWorkerResource());
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-  }
-
-  /**
-   * when only run tensorboard, input_path is not needed
-   * */
-  @Test
-  public void testNoInputPathOptionButOnlyRunTensorboard() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    boolean success = true;
-    try {
-      runJobCli.run(
-          new String[]{"--framework", "tensorflow",
-              "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-              "--num_workers", "0", "--tensorboard", "--verbose",
-              "--tensorboard_resources", "memory=2G,vcores=2",
-              "--tensorboard_docker_image", "tb_docker_image:001"});
-    } catch (ParseException e) {
-      success = false;
-    }
-    assertTrue(success);
-  }
-
-  @Test
-  public void testNumSchedulerCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for TensorFlow jobs");
-    runJobCli.run(
-        new String[] {"--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
-            "--verbose", "--wait_job_finish", "--num_schedulers", "1"});
-  }
-
-  @Test
-  public void testSchedulerResourcesCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for TensorFlow jobs");
-    runJobCli.run(
-        new String[] {"--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
-            "--verbose", "--wait_job_finish",
-            "--scheduler_resources", "memory=2048M,vcores=2"});
-  }
-
-  @Test
-  public void testSchedulerDockerImageCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for TensorFlow jobs");
-    runJobCli.run(
-        new String[] {"--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
-            "--verbose", "--wait_job_finish",
-            "--scheduler_docker_image", "schedulerDockerImage"});
-  }
-
-  @Test
-  public void testSchedulerLaunchCommandCannotBeDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    assertFalse(SubmarineLogs.isVerbose());
-
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("cannot be defined for TensorFlow jobs");
-    runJobCli.run(
-        new String[] {"--framework", "tensorflow",
-            "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
-            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
-            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
-            "--verbose", "--wait_job_finish",
-            "--scheduler_launch_cmd", "schedulerLaunchCommand"});
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java
deleted file mode 100644
index 4fb4b3e..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import org.apache.submarine.client.cli.param.yaml.Configs;
-import org.apache.submarine.client.cli.param.yaml.Role;
-import org.apache.submarine.client.cli.param.yaml.Roles;
-import org.apache.submarine.client.cli.param.yaml.Scheduling;
-import org.apache.submarine.client.cli.param.yaml.Security;
-import org.apache.submarine.client.cli.param.yaml.Spec;
-import org.apache.submarine.client.cli.param.yaml.TensorBoard;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of YAML configuration parsing.
- * Please note that this class just tests YAML parsing,
- * but only in an isolated fashion.
- */
-public class RunJobCliParsingTensorFlowYamlStandaloneTest {
-  private static final String OVERRIDDEN_PREFIX = "overridden_";
-  private static final String DIR_NAME = "runjob-tensorflow-yaml";
-
-  private void verifyBasicConfigValues(YamlConfigFile yamlConfigFile) {
-    assertNotNull("Spec file should not be null!", yamlConfigFile);
-    Spec spec = yamlConfigFile.getSpec();
-    assertNotNull("Spec should not be null!", spec);
-
-    assertEquals("testJobName", spec.getName());
-    assertEquals("testJobType", spec.getJobType());
-
-    Configs configs = yamlConfigFile.getConfigs();
-    assertNotNull("Configs should not be null!", configs);
-
-    assertEquals("testInputPath", configs.getInputPath());
-    assertEquals("testCheckpointPath", configs.getCheckpointPath());
-    assertEquals("testSavedModelPath", configs.getSavedModelPath());
-    assertEquals("testDockerImage", configs.getDockerImage());
-
-    Map<String, String> envs = configs.getEnvs();
-    assertNotNull("Envs should not be null!", envs);
-    assertEquals(2, envs.size());
-    assertEquals("env1Value", envs.get("env1"));
-    assertEquals("env2Value", envs.get("env2"));
-
-    List<String> localizations = configs.getLocalizations();
-    assertNotNull("Localizations should not be null!", localizations);
-    assertEquals("Size of localizations must be 2!", 2, localizations.size());
-    assertEquals("hdfs://remote-file1:/local-filename1:rw",
-        localizations.get(0));
-    assertEquals("nfs://remote-file2:/local-filename2:rw",
-        localizations.get(1));
-
-    List<String> mounts = configs.getMounts();
-    assertNotNull("Mounts should not be null!", mounts);
-    assertEquals("Size of mounts must be 2!", 2, mounts.size());
-    assertEquals("/etc/passwd:/etc/passwd:rw", mounts.get(0));
-    assertEquals("/etc/hosts:/etc/hosts:rw", mounts.get(1));
-
-    assertTrue(
-        configs.getQuicklinks().contains("Notebook_UI=https://master-0:7070"));
-    assertTrue(
-        configs.getQuicklinks().contains("Notebook_UI2=https://master-0:7071"));
-    assertEquals("true", configs.getWaitJobFinish());
-  }
-
-  private void assertRoleConfigOverrides(Role role, String prefix,
-                                         String roleType) {
-    assertNotNull(roleType + " role should not be null!", role);
-
-    assertEquals(String.format("%stestDockerImage%s", prefix, roleType),
-        role.getDockerImage());
-
-    //envs, localizations and mounts for Roles
-    // are only present in valid-config-with-overrides.yaml
-    boolean validateAll = !prefix.equals("");
-    if (validateAll) {
-      Map<String, String> envs = role.getEnvs();
-      assertNotNull("Envs should not be null!", envs);
-      assertEquals(String.format("%senv1%s", prefix, roleType),
-          envs.get("env1"));
-      assertEquals(String.format("%senv2%s", prefix, roleType),
-          envs.get("env2"));
-    }
-
-    if (validateAll) {
-      List<String> localizations = role.getLocalizations();
-      assertNotNull("Localizations should not be null!", localizations);
-      assertEquals("Size of localizations must be 2!", 2, localizations.size());
-      assertEquals(String.format("hdfs://remote-file1:/%slocal" +
-          "-filename1%s:rw", prefix, roleType), localizations.get(0));
-      assertEquals(String.format("nfs://remote-file2:/%slocal" +
-          "-filename2%s:rw", prefix, roleType), localizations.get(1));
-    }
-
-    if (validateAll) {
-      List<String> mounts = role.getMounts();
-      assertNotNull("Mounts should not be null!", mounts);
-      assertEquals("Size of mounts must be 2!", 2, mounts.size());
-      assertEquals(String.format("/etc/passwd:/%s%s", prefix, roleType),
-          mounts.get(0));
-      assertEquals(String.format("/etc/hosts:/%s%s", prefix, roleType),
-          mounts.get(1));
-    }
-  }
-
-  private void assertWorkerValues(Role worker) {
-    assertEquals("testLaunchCmdWorker", worker.getLaunchCmd());
-    assertEquals("testDockerImageWorker", worker.getDockerImage());
-    assertEquals("memory=20480M,vcores=32", worker.getResources());
-    assertEquals(3, worker.getReplicas());
-  }
-
-  private void assertPsValues(Role ps) {
-    assertEquals("testLaunchCmdPs", ps.getLaunchCmd());
-    assertEquals("testDockerImagePs", ps.getDockerImage());
-    assertEquals("memory=20500M,vcores=34", ps.getResources());
-    assertEquals(4, ps.getReplicas());
-  }
-
-  private void verifySchedulingValues(YamlConfigFile yamlConfigFile) {
-    Scheduling scheduling = yamlConfigFile.getScheduling();
-    assertNotNull("Scheduling should not be null!", scheduling);
-    assertEquals("queue1", scheduling.getQueue());
-  }
-
-  private void verifySecurityValues(YamlConfigFile yamlConfigFile) {
-    Security security = yamlConfigFile.getSecurity();
-    assertNotNull("Security should not be null!", security);
-    assertEquals("keytabPath", security.getKeytab());
-    assertEquals("testPrincipal", security.getPrincipal());
-    assertTrue(security.isDistributeKeytab());
-  }
-
-  private void verifyTensorboardValues(YamlConfigFile yamlConfigFile) {
-    TensorBoard tensorBoard = yamlConfigFile.getTensorBoard();
-    assertNotNull("Tensorboard should not be null!", tensorBoard);
-    assertEquals("tensorboardDockerImage", tensorBoard.getDockerImage());
-    assertEquals("memory=21000M,vcores=37", tensorBoard.getResources());
-  }
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @Test
-  public void testLaunchCommandYaml() {
-    YamlConfigFile yamlConfigFile = YamlConfigTestUtils.readYamlConfigFile(DIR_NAME +
-        "/valid-config.yaml");
-
-    verifyBasicConfigValues(yamlConfigFile);
-
-    Roles roles = yamlConfigFile.getRoles();
-    assertNotNull("Roles should not be null!", roles);
-    assertRoleConfigOverrides(roles.getWorker(), "", "Worker");
-    assertRoleConfigOverrides(roles.getPs(), "", "Ps");
-
-    assertWorkerValues(roles.getWorker());
-    assertPsValues(roles.getPs());
-
-    verifySchedulingValues(yamlConfigFile);
-    verifySecurityValues(yamlConfigFile);
-    verifyTensorboardValues(yamlConfigFile);
-  }
-
-  @Test
-  public void testOverrides() {
-    YamlConfigFile yamlConfigFile = YamlConfigTestUtils.readYamlConfigFile(DIR_NAME +
-        "/valid-config-with-overrides.yaml");
-
-    verifyBasicConfigValues(yamlConfigFile);
-
-    Roles roles = yamlConfigFile.getRoles();
-    assertNotNull("Roles should not be null!", roles);
-    assertRoleConfigOverrides(roles.getWorker(), OVERRIDDEN_PREFIX, "Worker");
-    assertRoleConfigOverrides(roles.getPs(), OVERRIDDEN_PREFIX, "Ps");
-  }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java
deleted file mode 100644
index 894fc08..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of TF YAML configuration parsing.
- */
-public class RunJobCliParsingTensorFlowYamlTest {
-  private static final String OVERRIDDEN_PREFIX = "overridden_";
-  private static final String DIR_NAME = "runjob-tensorflow-yaml";
-  private File yamlConfig;
-  private static Logger LOG = LoggerFactory.getLogger(
-      RunJobCliParsingTensorFlowYamlTest.class);
-
-  @Before
-  public void before() {
-    SubmarineLogs.verboseOff();
-  }
-
-  @After
-  public void after() {
-    YamlConfigTestUtils.deleteFile(yamlConfig);
-  }
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
-    verifyBasicConfigValues(jobRunParameters,
-        ImmutableList.of("env1=env1Value", "env2=env2Value"));
-  }
-
-  private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
-      List<String> expectedEnvs) {
-    assertEquals("testInputPath", jobRunParameters.getInputPath());
-    assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
-    assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
-    assertNotNull(jobRunParameters.getLocalizations());
-    assertEquals(2, jobRunParameters.getLocalizations().size());
-
-    assertNotNull(jobRunParameters.getQuicklinks());
-    assertEquals(2, jobRunParameters.getQuicklinks().size());
-
-    assertTrue(SubmarineLogs.isVerbose());
-    assertTrue(jobRunParameters.isWaitJobFinish());
-
-    for (String env : expectedEnvs) {
-      assertTrue(String.format(
-          "%s should be in env list of jobRunParameters!", env),
-          jobRunParameters.getEnvars().contains(env));
-    }
-  }
-
-  private void verifyPsValues(RunJobParameters jobRunParameters,
-      String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertEquals(4, tensorFlowParams.getNumPS());
-    assertEquals(prefix + "testLaunchCmdPs", tensorFlowParams.getPSLaunchCmd());
-    assertEquals(prefix + "testDockerImagePs",
-        tensorFlowParams.getPsDockerImage());
-    assertEquals(Resources.createResource(20500, 34),
-        tensorFlowParams.getPsResource());
-  }
-
-  private TensorFlowRunJobParameters verifyWorkerCommonValues(
-      RunJobParameters jobRunParameters, String prefix) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertEquals(3, tensorFlowParams.getNumWorkers());
-    assertEquals(prefix + "testLaunchCmdWorker",
-        tensorFlowParams.getWorkerLaunchCmd());
-    assertEquals(prefix + "testDockerImageWorker",
-        tensorFlowParams.getWorkerDockerImage());
-    return tensorFlowParams;
-  }
-
-  private void verifyWorkerValues(RunJobParameters jobRunParameters,
-      String prefix) {
-    TensorFlowRunJobParameters tensorFlowParams = verifyWorkerCommonValues
-        (jobRunParameters, prefix);
-    assertEquals(Resources.createResource(20480, 32),
-        tensorFlowParams.getWorkerResource());
-  }
-
-  private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters,
-                                  String prefix) {
-    TensorFlowRunJobParameters tensorFlowParams = verifyWorkerCommonValues
-        (jobRunParameters, prefix);
-    Resource workResource = Resources.createResource(20480, 32);
-    ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
-    assertEquals(workResource, tensorFlowParams.getWorkerResource());
-  }
-
-  private void verifySecurityValues(RunJobParameters jobRunParameters) {
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertEquals("testPrincipal", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  private void verifyTensorboardValues(RunJobParameters jobRunParameters) {
-    assertTrue(RunJobParameters.class + " must be an instance of " +
-            TensorFlowRunJobParameters.class,
-        jobRunParameters instanceof TensorFlowRunJobParameters);
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertTrue(tensorFlowParams.isTensorboardEnabled());
-    assertEquals("tensorboardDockerImage",
-        tensorFlowParams.getTensorboardDockerImage());
-    assertEquals(Resources.createResource(21000, 37),
-        tensorFlowParams.getTensorboardResource());
-  }
-
-  @Test
-  public void testValidYamlParsing() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-    verifyTensorboardValues(jobRunParameters);
-  }
-
-  @Test
-  public void testValidGpuYamlParsing() throws Exception {
-    try {
-      ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
-    } catch (SubmarineRuntimeException e) {
-      LOG.info("The hadoop dependency doesn't support gpu resource, " +
-          "so just skip this test case.");
-      return;
-    }
-
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-gpu-config.yaml");
-    runJobCli.run(
-        new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifyWorkerValuesWithGpu(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-    verifyTensorboardValues(jobRunParameters);
-  }
-
-  @Test
-  public void testRoleOverrides() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-    Assert.assertFalse(SubmarineLogs.isVerbose());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/valid-config-with-overrides.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
-    verifySecurityValues(jobRunParameters);
-    verifyTensorboardValues(jobRunParameters);
-  }
-
-  @Test
-  public void testMissingPrincipalUnderSecuritySection() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/security-principal-is-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifyTensorboardValues(jobRunParameters);
-
-    //Verify security values
-    assertEquals("keytabPath", jobRunParameters.getKeytab());
-    assertNull("Principal should be null!", jobRunParameters.getPrincipal());
-    assertTrue(jobRunParameters.isDistributeKeytab());
-  }
-
-  @Test
-  public void testMissingTensorBoardDockerImage() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/tensorboard-dockerimage-is-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-
-    verifyBasicConfigValues(jobRunParameters);
-    verifyPsValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-
-    TensorFlowRunJobParameters tensorFlowParams =
-        (TensorFlowRunJobParameters) jobRunParameters;
-
-    assertTrue(tensorFlowParams.isTensorboardEnabled());
-    assertNull("tensorboardDockerImage should be null!",
-        tensorFlowParams.getTensorboardDockerImage());
-    assertEquals(Resources.createResource(21000, 37),
-        tensorFlowParams.getTensorboardResource());
-  }
-
-  @Test
-  public void testMissingEnvs() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/envs-are-missing.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
-    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-    verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
-    verifyPsValues(jobRunParameters, "");
-    verifyWorkerValues(jobRunParameters, "");
-    verifySecurityValues(jobRunParameters);
-    verifyTensorboardValues(jobRunParameters);
-  }
-
-  @Test
-  public void testInvalidConfigSchedulerSectionIsDefined() throws Exception {
-    RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
-    exception.expect(YamlParseException.class);
-    exception.expectMessage("Scheduler section should not be defined " +
-        "when MXNet is not the selected framework!");
-    yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
-        DIR_NAME + "/invalid-config-scheduler-section.yaml");
-    runJobCli.run(
-        new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-  }
-}
diff --git a/submarine-commons/commons-runtime/pom.xml b/submarine-commons/commons-runtime/pom.xml
index f8ab7dd..e17cd7c 100644
--- a/submarine-commons/commons-runtime/pom.xml
+++ b/submarine-commons/commons-runtime/pom.xml
@@ -76,121 +76,6 @@
       <artifactId>jackson-core</artifactId>
     </dependency>
 
-    <!-- Dependencies for Hadoop -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-lang3</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpcore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-lang</groupId>
-          <artifactId>commons-lang</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-collections</groupId>
-          <artifactId>commons-collections</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-core-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-mapper-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.code.findbugs</groupId>
-          <artifactId>jsr305</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-      <version>${hadoop.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.code.findbugs</groupId>
-          <artifactId>jsr305</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-      <version>${hadoop.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-jaxrs</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-xc</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-mapper-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-core-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-lang3</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java
deleted file mode 100644
index a2a07c1..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.commons.runtime.fs.DefaultRemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-public class ClientContext {
-  private Configuration yarnConf = new YarnConfiguration();
-
-  private volatile RemoteDirectoryManager remoteDirectoryManager;
-  private YarnClient yarnClient;
-  private SubmarineConfiguration submarineConfig;
-  private RuntimeFactory runtimeFactory;
-
-  public ClientContext() {
-    submarineConfig = SubmarineConfiguration.getInstance();
-  }
-
-  public synchronized YarnClient getOrCreateYarnClient() {
-    if (yarnClient == null) {
-      yarnClient = YarnClient.createYarnClient();
-      yarnClient.init(yarnConf);
-      yarnClient.start();
-    }
-    return yarnClient;
-  }
-
-  public Configuration getYarnConfig() {
-    return yarnConf;
-  }
-
-  public void setYarnConfig(Configuration conf) {
-    this.yarnConf = conf;
-  }
-
-  public RemoteDirectoryManager getRemoteDirectoryManager() {
-    if (remoteDirectoryManager == null) {
-      synchronized (this) {
-        if (remoteDirectoryManager == null) {
-          remoteDirectoryManager = new DefaultRemoteDirectoryManager(this);
-        }
-      }
-    }
-    return remoteDirectoryManager;
-  }
-
-  public SubmarineConfiguration getSubmarineConfig() {
-    return submarineConfig;
-  }
-
-  public void setSubmarineConfig(SubmarineConfiguration submarineConfig) {
-    this.submarineConfig = submarineConfig;
-  }
-
-  public RuntimeFactory getRuntimeFactory() {
-    return runtimeFactory;
-  }
-
-  public void setRuntimeFactory(RuntimeFactory runtimeFactory) {
-    this.runtimeFactory = runtimeFactory;
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java
deleted file mode 100644
index ca6bfc6..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.runtime.api.JobState;
-import org.apache.submarine.commons.runtime.api.JobStatus;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Monitor status of job(s)
- */
-public abstract class JobMonitor {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(JobMonitor.class);
-  protected ClientContext clientContext;
-
-  public JobMonitor(ClientContext clientContext) {
-    this.clientContext = clientContext;
-  }
-
-  /**
-   * Returns status of training job.
-   *
-   * @param jobName name of job
-   * @return job status
-   * @throws IOException anything else happens
-   * @throws YarnException anything related to YARN happens
-   */
-  public abstract JobStatus getTrainingJobStatus(String jobName)
-      throws IOException, YarnException;
-
-  /**
-   * Cleanup AppAdminClient, etc.
-   */
-  public void cleanup() throws IOException {}
-
-  /**
-   * Continue wait and print status if job goes to ready or final state.
-   * @param jobName
-   * @throws IOException
-   * @throws YarnException
-   * @throws SubmarineException
-   */
-  public void waitTrainingFinal(String jobName)
-      throws IOException, YarnException, SubmarineException {
-    // Wait 5 sec between each fetch.
-    int waitIntervalSec = 5;
-    JobStatus js;
-    while (true) {
-      js = getTrainingJobStatus(jobName);
-      JobState jobState = js.getState();
-      js.nicePrint(System.err);
-
-      if (JobState.isFinal(jobState)) {
-        if (jobState.equals(JobState.FAILED)) {
-          throw new SubmarineException("Job failed");
-        } else if (jobState.equals(JobState.KILLED)) {
-          throw new SubmarineException("Job killed");
-        }
-        LOG.info("Job exited with state=" + jobState);
-        break;
-      }
-
-      try {
-        Thread.sleep(waitIntervalSec * 1000);
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    }
-    cleanup();
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java
deleted file mode 100644
index a7d23ca..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-import java.io.IOException;
-
-/**
- * Submit job to cluster master.
- */
-public interface JobSubmitter {
-  /**
-   * Submit a job to cluster.
-   * @param parameters run job parameters
-   * @return applicationId when successfully submitted
-   * @throws YarnException for issues while contacting YARN daemons
-   * @throws IOException for other issues.
-   */
-  ApplicationId submitJob(Parameter parameters)
-          throws IOException, YarnException, SubmarineException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java
deleted file mode 100644
index f6a65ad..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.commons.utils.SubmarineConfVars;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-
-import java.lang.reflect.InvocationTargetException;
-
-public abstract class RuntimeFactory {
-  protected ClientContext clientContext;
-  private JobSubmitter jobSubmitter;
-  private JobMonitor jobMonitor;
-  private SubmarineStorage submarineStorage;
-
-  public RuntimeFactory(ClientContext clientContext) {
-    this.clientContext = clientContext;
-  }
-
-  public static RuntimeFactory getRuntimeFactory(
-      ClientContext clientContext, ClassLoader classLoader) {
-    SubmarineConfiguration submarineConfiguration =
-        clientContext.getSubmarineConfig();
-    String runtimeClass = submarineConfiguration.getString(
-        SubmarineConfVars.ConfVars.SUBMARINE_RUNTIME_CLASS);
-
-    try {
-      Class<?> runtimeClazz = Class.forName(runtimeClass, true, classLoader);
-      if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) {
-        return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext);
-      } else {
-        throw new SubmarineRuntimeException("Class: " + runtimeClass
-            + " not instance of " + RuntimeFactory.class.getCanonicalName());
-      }
-    } catch (ClassNotFoundException | IllegalAccessException |
-        InstantiationException | NoSuchMethodException |
-        InvocationTargetException e) {
-      throw new SubmarineRuntimeException(
-          "Could not instantiate RuntimeFactory: " + runtimeClass, e);
-    }
-  }
-
-  protected abstract JobSubmitter internalCreateJobSubmitter();
-
-  protected abstract JobMonitor internalCreateJobMonitor();
-
-  protected abstract SubmarineStorage internalCreateSubmarineStorage();
-
-  public synchronized JobSubmitter getJobSubmitterInstance() {
-    if (jobSubmitter == null) {
-      jobSubmitter = internalCreateJobSubmitter();
-    }
-    return jobSubmitter;
-  }
-
-  public synchronized JobMonitor getJobMonitorInstance() {
-    if (jobMonitor == null) {
-      jobMonitor = internalCreateJobMonitor();
-    }
-    return jobMonitor;
-  }
-
-  public synchronized SubmarineStorage getSubmarineStorage() {
-    if (submarineStorage == null) {
-      submarineStorage = internalCreateSubmarineStorage();
-    }
-    return submarineStorage;
-  }
-
-  @VisibleForTesting
-  public synchronized void setJobSubmitterInstance(JobSubmitter jobSubmitter) {
-    this.jobSubmitter = jobSubmitter;
-  }
-
-  @VisibleForTesting
-  public synchronized void setJobMonitorInstance(JobMonitor jobMonitor) {
-    this.jobMonitor = jobMonitor;
-  }
-
-  @VisibleForTesting
-  public synchronized void setSubmarineStorage(SubmarineStorage storage) {
-    this.submarineStorage = storage;
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java
deleted file mode 100644
index c7de978..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Manages remote directories for staging, log, etc.
- * TODO(keqiu), need to properly handle permission / name validation, etc.
- */
-public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
-  private FileSystem fs;
-  private Configuration conf;
-
-  public DefaultRemoteDirectoryManager(ClientContext context) {
-    this.conf = context.getYarnConfig();
-    try {
-      this.fs = FileSystem.get(context.getYarnConfig());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Path getJobStagingArea(String jobName, boolean create)
-      throws IOException {
-    Path staging = new Path(getJobRootFolder(jobName), "staging");
-    if (create) {
-      createFolderIfNotExist(staging);
-    }
-
-    // Get a file status to make sure it is a absolute path.
-    FileStatus fStatus = fs.getFileStatus(staging);
-    return fStatus.getPath();
-  }
-
-  @Override
-  public Path getJobCheckpointDir(String jobName, boolean create)
-      throws IOException {
-    Path jobDir = new Path(getJobStagingArea(jobName, create),
-        "checkpoint_path");
-    if (create) {
-      createFolderIfNotExist(jobDir);
-    }
-    return jobDir;
-  }
-
-  @Override
-  public Path getModelDir(String modelName, boolean create)
-      throws IOException {
-    Path modelDir = new Path(new Path("submarine", "models"), modelName);
-    if (create) {
-      createFolderIfNotExist(modelDir);
-    }
-    return modelDir;
-  }
-
-  @Override
-  public FileSystem getDefaultFileSystem() {
-    return fs;
-  }
-
-  @Override
-  public FileSystem getFileSystemByUri(String uri) throws IOException {
-    return FileSystem.get(URI.create(uri), conf);
-  }
-
-  @Override
-  public Path getUserRootFolder() throws IOException {
-    Path rootPath = new Path("submarine", "jobs");
-    createFolderIfNotExist(rootPath);
-    // Get a file status to make sure it is a absolute path.
-    FileStatus fStatus = fs.getFileStatus(rootPath);
-    return fStatus.getPath();
-  }
-
-  @Override
-  public boolean isDir(String uri) throws IOException {
-    if (isRemote(uri)) {
-      return getFileSystemByUri(uri).getFileStatus(new Path(uri)).isDirectory();
-    }
-    return new File(uri).isDirectory();
-  }
-
-  @Override
-  public boolean isRemote(String uri) {
-    String scheme = new Path(uri).toUri().getScheme();
-    if (null == scheme) {
-      return false;
-    }
-    return !scheme.startsWith("file://");
-  }
-
-  @Override
-  public boolean copyRemoteToLocal(String remoteUri, String localUri)
-      throws IOException {
-    // Delete old to avoid failure in FileUtil.copy
-    File old = new File(localUri);
-    if (old.exists()) {
-      if (!FileUtil.fullyDelete(old)) {
-        throw new IOException("Failed to delete dir:"
-            + old.getAbsolutePath());
-      }
-    }
-    return FileUtil.copy(getFileSystemByUri(remoteUri), new Path(remoteUri),
-        new File(localUri), false,
-        conf);
-  }
-
-  @Override
-  public boolean existsRemoteFile(String uri) throws IOException {
-    return getFileSystemByUri(uri).exists(new Path(uri));
-  }
-
-  @Override
-  public FileStatus getRemoteFileStatus(String uri) throws IOException {
-    return getFileSystemByUri(uri).getFileStatus(new Path(uri));
-  }
-
-  @Override
-  public long getRemoteFileSize(String uri) throws IOException {
-    return getFileSystemByUri(uri)
-        .getContentSummary(new Path(uri)).getSpaceConsumed();
-  }
-
-  private Path getJobRootFolder(String jobName) throws IOException {
-    Path userRoot = getUserRootFolder();
-    Path jobRootPath = new Path(userRoot, jobName);
-    createFolderIfNotExist(jobRootPath);
-    // Get a file status to make sure it is a absolute path.
-    FileStatus fStatus = fs.getFileStatus(jobRootPath);
-    return fStatus.getPath();
-  }
-
-  private void createFolderIfNotExist(Path path) throws IOException {
-    if (!fs.exists(path)) {
-      if (!fs.mkdirs(path)) {
-        throw new IOException("Failed to create folder=" + path);
-      }
-    }
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java
deleted file mode 100644
index befb8c9..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-/**
- * A super naive FS-based storage.
- */
-public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
-  RemoteDirectoryManager rdm;
-
-  public FSBasedSubmarineStorageImpl(ClientContext clientContext) {
-    rdm = clientContext.getRemoteDirectoryManager();
-  }
-
-  @Override
-  public void addNewJob(String jobName, Map<String, String> jobInfo)
-      throws IOException {
-    Path jobInfoPath = getJobInfoPath(jobName, true);
-    FSDataOutputStream fos = rdm.getDefaultFileSystem().create(jobInfoPath);
-    serializeMap(fos, jobInfo);
-  }
-
-  @Override
-  public Map<String, String> getJobInfoByName(String jobName)
-      throws IOException {
-    Path jobInfoPath = getJobInfoPath(jobName, false);
-    FSDataInputStream fis = rdm.getDefaultFileSystem().open(jobInfoPath);
-    return deserializeMap(fis);
-  }
-
-  @Override
-  public void addNewModel(String modelName, String version,
-      Map<String, String> modelInfo) throws IOException {
-    Path modelInfoPath = getModelInfoPath(modelName, version, true);
-    FSDataOutputStream fos = rdm.getDefaultFileSystem().create(modelInfoPath);
-    serializeMap(fos, modelInfo);
-  }
-
-  @Override
-  public Map<String, String> getModelInfoByName(String modelName,
-      String version) throws IOException {
-    Path modelInfoPath = getModelInfoPath(modelName, version, false);
-    FSDataInputStream fis = rdm.getDefaultFileSystem().open(modelInfoPath);
-    return deserializeMap(fis);
-  }
-
-  private Path getModelInfoPath(String modelName, String version, boolean create)
-      throws IOException {
-    Path modelDir = rdm.getModelDir(modelName, create);
-    return new Path(modelDir, version + ".info");
-  }
-
-  private void serializeMap(FSDataOutputStream fos, Map<String, String> map)
-      throws IOException {
-    ObjectOutput oo = new ObjectOutputStream(fos);
-    oo.writeObject(map);
-    oo.close();
-  }
-
-  private Map<String, String> deserializeMap(FSDataInputStream fis)
-      throws IOException {
-    ObjectInput oi = new ObjectInputStream(fis);
-    Map<String, String> newMap;
-    try {
-      newMap = (Map<String, String>) oi.readObject();
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-    return newMap;
-  }
-
-  private Path getJobInfoPath(String jobName, boolean create) throws IOException {
-    Path path = rdm.getJobStagingArea(jobName, create);
-    return new Path(path, "job.info");
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java
deleted file mode 100644
index bac953a..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-public interface RemoteDirectoryManager {
-  Path getJobStagingArea(String jobName, boolean create) throws IOException;
-
-  Path getJobCheckpointDir(String jobName, boolean create) throws IOException;
-
-  Path getModelDir(String modelName, boolean create) throws IOException;
-
-  FileSystem getDefaultFileSystem() throws IOException;
-
-  FileSystem getFileSystemByUri(String uri) throws IOException;
-
-  Path getUserRootFolder() throws IOException;
-
-  boolean isDir(String uri) throws IOException;
-
-  boolean isRemote(String uri) throws IOException;
-
-  boolean copyRemoteToLocal(String remoteUri, String localUri)
-      throws IOException;
-
-  boolean existsRemoteFile(String uri) throws IOException;
-
-  FileStatus getRemoteFileStatus(String uri) throws IOException;
-
-  long getRemoteFileSize(String uri) throws IOException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java
deleted file mode 100644
index 85c565e..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package org.apache.submarine.commons.runtime.param;
-
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * Base class of all parameters.
- */
-public abstract class BaseParameters {
-  private String name;
-
-  public void updateParameters(Parameter parameter, ClientContext clientContext)
-      throws ParseException, IOException, YarnException {
-    String name = parameter.getOptionValue("name");
-    if (name == null) {
-      throw new ParseException("--name is absent");
-    }
-
-    if (parameter.hasOption("verbose")) {
-      SubmarineLogs.verboseOn();
-    }
-
-    this.setName(name);
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public BaseParameters setName(String name) {
-    this.name = name;
-    return this;
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
deleted file mode 100644
index 95fffe1..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.param;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.Framework;
-
-import java.util.List;
-
-public interface Parameter {
-  /**
-   * Get the ML framework
-   * @return Framework
-   */
-  Framework getFramework();
-
-  Parameter setFramework(Framework framework);
-
-  BaseParameters getParameters();
-
-  Parameter setParameters(BaseParameters parameters);
-
-  String getOptionValue(String option) throws YarnException;
-
-  boolean hasOption(String option);
-
-  List<String> getOptionValues(String option) throws YarnException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
deleted file mode 100644
index 3abb261..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.resource;
-
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class implements some methods with the almost the same logic as
- * org.apache.hadoop.yarn.util.resource.ResourceUtils of hadoop 3.3.
- * If the hadoop dependencies are upgraded to 3.3, this class can be refactored
- * with org.apache.hadoop.yarn.util.resource.ResourceUtils.
- */
-public final class ResourceUtils {
-
-  private static final String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
-  private static final String SET_RESOURCE_VALUE_METHOD = "setResourceValue";
-  private static final String SET_MEMORY_SIZE_METHOD = "setMemorySize";
-  private static final String DEPRECATED_SET_MEMORY_SIZE_METHOD =
-      "setMemory";
-  private static final String GET_MEMORY_SIZE_METHOD = "getMemorySize";
-  private static final String DEPRECATED_GET_MEMORY_SIZE_METHOD =
-      "getMemory";
-  private static final String GET_RESOURCE_VALUE_METHOD = "getResourceValue";
-  private static final String GET_RESOURCE_TYPE_METHOD =
-      "getResourcesTypeInfo";
-  private static final String GET_RESOURCES = "getResources";
-  private static final String REINITIALIZE_RESOURCES_METHOD =
-      "reinitializeResources";
-  public static final String MEMORY_URI = "memory-mb";
-  public static final String VCORES_URI = "vcores";
-  public static final String GPU_URI = "yarn.io/gpu";
-  public static final String FPGA_URI = "yarn.io/fpga";
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ResourceUtils.class);
-
-  private ResourceUtils() {
-  }
-
-  public static Resource createResourceFromString(String resourceStr) {
-    Map<String, Long> typeToValue = parseResourcesString(resourceStr);
-    return createResource(typeToValue);
-  }
-
-  public static Resource createResource(Map<String, Long> typeToValue) {
-    Resource resource = Resource.newInstance(0, 0);
-    for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
-      if (entry.getKey().equalsIgnoreCase(VCORES_URI)) {
-        resource.setVirtualCores(entry.getValue().intValue());
-        continue;
-      } else if (entry.getKey().equalsIgnoreCase(MEMORY_URI)) {
-        setMemorySize(resource, entry.getValue());
-        continue;
-      }
-      setResource(resource, entry.getKey(), entry.getValue().intValue());
-    }
-    return resource;
-  }
-
-  public static Map<String, Long> getResourceMap(Resource resource) {
-    Map<String, Long> resourceMap;
-    if (resource == null) {
-      resourceMap = new HashMap<>();
-    } else {
-      String resourceValue = resource.toString();
-      // Delete <> in the resourceValue and replace ":" with "="
-      resourceMap =
-      parseResourcesString(
-          resourceValue.substring(1, resourceValue.length() - 1)
-              .replaceAll(":", "=")
-              .replaceAll("memory", "memory-mb"));
-    }
-    return resourceMap;
-  }
-
-  private static Map<String, Long> parseResourcesString(String resourcesStr) {
-    Map<String, Long> resources = new HashMap<>();
-    String[] pairs = resourcesStr.trim().split(",");
-    for (String resource : pairs) {
-      resource = resource.trim();
-      if (!resource.matches(RES_PATTERN)) {
-        throw new IllegalArgumentException("\"" + resource + "\" is not a "
-            + "valid resource type/amount pair. "
-            + "Please provide key=amount pairs separated by commas.");
-      }
-      String[] splits = resource.split("=");
-      String key = splits[0], value = splits[1];
-      String units = getUnits(value);
-
-      String valueWithoutUnit = value.substring(0,
-          value.length() - units.length()).trim();
-      long resourceValue = Long.parseLong(valueWithoutUnit);
-
-      // Convert commandline unit to standard YARN unit.
-      if (units.equals("M") || units.equals("m")) {
-        units = "Mi";
-      } else if (units.equals("G") || units.equals("g")) {
-        units = "Gi";
-      } else if (!units.isEmpty()) {
-        throw new IllegalArgumentException("Acceptable units are M/G or empty");
-      }
-
-      // special handle memory-mb and memory
-      if (key.equals(MEMORY_URI)) {
-        if (!units.isEmpty()) {
-          resourceValue = UnitsConversionUtil.convert(units, "Mi",
-              resourceValue);
-        }
-      }
-
-      if (key.equals("memory")) {
-        key = MEMORY_URI;
-        resourceValue = UnitsConversionUtil.convert(units, "Mi",
-            resourceValue);
-      }
-
-      // special handle gpu
-      if (key.equals("gpu")) {
-        key = GPU_URI;
-      }
-
-      // special handle fpga
-      if (key.equals("fpga")) {
-        key = FPGA_URI;
-      }
-
-      resources.put(key, resourceValue);
-    }
-    return resources;
-  }
-
-  /**
-   * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
-   * Use reflection to set GPU or other resources for compatibility with
-   * hadoop 2.9.2
-   */
-  public static void setResource(Resource resource, String resourceName,
-                                 int resourceValue) {
-    try {
-      Method method = resource.getClass().getMethod(SET_RESOURCE_VALUE_METHOD,
-          String.class, long.class);
-      method.invoke(resource, resourceName, resourceValue);
-    } catch (NoSuchMethodException e) {
-      LOG.error("There is no '" + SET_RESOURCE_VALUE_METHOD + "' API in this" +
-          "version of YARN", e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.error("Failed to invoke '" + SET_RESOURCE_VALUE_METHOD +
-          "' method to set GPU resources", e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    }
-    return;
-  }
-
-  public static void setMemorySize(Resource resource, Long memorySize) {
-    boolean useWithIntParameter = false;
-    // For hadoop 2.9.2 and above
-    try {
-      Method method = resource.getClass().getMethod(SET_MEMORY_SIZE_METHOD,
-          long.class);
-      method.setAccessible(true);
-      method.invoke(resource, memorySize);
-    } catch (NoSuchMethodException nsme) {
-      LOG.info("There is no '" + SET_MEMORY_SIZE_METHOD + "(long)' API in" +
-          " this version of YARN");
-      useWithIntParameter = true;
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.error("Failed to invoke '" + SET_MEMORY_SIZE_METHOD +
-          "' method", e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    }
-    // For hadoop 2.7.3
-    if (useWithIntParameter) {
-      try {
-        LOG.info("Trying to use '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
-            "(int)' API for this version of YARN");
-        Method method = resource.getClass().getMethod(
-            DEPRECATED_SET_MEMORY_SIZE_METHOD, int.class);
-        method.invoke(resource, memorySize.intValue());
-      } catch (NoSuchMethodException e) {
-        LOG.error("There is no '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
-            "(int)' API in this version of YARN", e);
-        throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        LOG.error("Failed to invoke '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
-            "' method", e);
-        throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-      }
-    }
-  }
-
-  public static long getMemorySize(Resource resource) {
-    boolean useWithIntParameter = false;
-    long memory = 0;
-    // For hadoop 2.9.2 and above
-    try {
-      Method method = resource.getClass().getMethod(GET_MEMORY_SIZE_METHOD);
-      method.setAccessible(true);
-      memory = (long) method.invoke(resource);
-    } catch (NoSuchMethodException e) {
-      LOG.info("There is no '" + GET_MEMORY_SIZE_METHOD + "' API in" +
-          " this version of YARN");
-      useWithIntParameter = true;
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.error("Failed to invoke '" + GET_MEMORY_SIZE_METHOD +
-          "' method", e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    }
-    // For hadoop 2.7.3
-    if (useWithIntParameter) {
-      try {
-        LOG.info("Trying to use '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
-            "' API for this version of YARN");
-        Method method = resource.getClass().getMethod(
-            DEPRECATED_GET_MEMORY_SIZE_METHOD);
-        method.setAccessible(true);
-        memory = ((Integer) method.invoke(resource)).longValue();
-      } catch (NoSuchMethodException e) {
-        LOG.error("There is no '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
-            "' API in this version of YARN", e);
-        throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        LOG.error("Failed to invoke '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
-            "' method", e);
-        throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-      }
-    }
-    return memory;
-  }
-
-  /**
-   * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
-   * Use reflection to set GPU or other resources for compatibility with
-   * hadoop 2.9.2
-   */
-  public static long getResourceValue(Resource resource, String resourceName) {
-    long resourceValue = 0;
-    try {
-      Method method = resource.getClass().getMethod(GET_RESOURCE_VALUE_METHOD,
-          String.class);
-      Object value = method.invoke(resource, resourceName);
-      resourceValue = (long) value;
-    } catch (NoSuchMethodException e) {
-      LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' API in this" +
-          " version of YARN");
-    } catch (InvocationTargetException e) {
-      if (e.getTargetException().getClass().getName().equals(
-          "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException")) {
-        LOG.info("Not found resource " + resourceName);
-      } else {
-        LOG.info("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD + "'" +
-            " method to get resource " + resourceName);
-        throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-      }
-    } catch (IllegalAccessException | ClassCastException e) {
-      LOG.error("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD +
-          "' method to get resource " + resourceName, e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    }
-    return resourceValue;
-  }
-
-  /**
-   * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
-   * Use reflection to add GPU or other resources for compatibility with
-   * hadoop 2.9.2
-   */
-  public static void configureResourceType(String resrouceName) {
-    Class resourceTypeInfo;
-    try {
-      resourceTypeInfo = Class.forName(
-          "org.apache.hadoop.yarn.api.records.ResourceTypeInfo");
-      Class resourceUtils = Class.forName(
-          "org.apache.hadoop.yarn.util.resource.ResourceUtils");
-      Method method = resourceUtils.getMethod(GET_RESOURCE_TYPE_METHOD);
-      Object resTypes = method.invoke(null);
-
-      Method resourceTypeInstance = resourceTypeInfo.getMethod("newInstance",
-          String.class, String.class);
-      Object resourceType = resourceTypeInstance.invoke(null, resrouceName, "");
-      ((ArrayList) resTypes).add(resourceType);
-
-      Method reInitialMethod = resourceUtils.getMethod(
-          REINITIALIZE_RESOURCES_METHOD, List.class);
-      reInitialMethod.invoke(null, resTypes);
-
-    } catch (ClassNotFoundException e) {
-      LOG.info("There is no specified class API in this" +
-          " version of YARN");
-      LOG.info(e.getMessage());
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    } catch (NoSuchMethodException nsme) {
-      LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' or '" +
-          REINITIALIZE_RESOURCES_METHOD + "' API in this" +
-          " version of YARN");
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.info("Failed to invoke 'configureResourceType' method ", e);
-      throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
-    }
-  }
-
-  private static String getUnits(String resourceValue) {
-    return parseResourceValue(resourceValue)[0];
-  }
-
-  /**
-   * Extract unit and actual value from resource value.
-   * @param resourceValue Value of the resource
-   * @return Array containing unit and value. [0]=unit, [1]=value
-   * @throws IllegalArgumentException if units contain non alpha characters
-   */
-  private static String[] parseResourceValue(String resourceValue) {
-    String[] resource = new String[2];
-    int i = 0;
-    for (; i < resourceValue.length(); i++) {
-      if (Character.isAlphabetic(resourceValue.charAt(i))) {
-        break;
-      }
-    }
-    String units = resourceValue.substring(i);
-
-    if (StringUtils.isAlpha(units) || units.equals("")) {
-      resource[0] = units;
-      resource[1] = resourceValue.substring(0, i);
-      return resource;
-    } else {
-      throw new IllegalArgumentException("Units '" + units + "'"
-          + " contains non alphabet characters, which is not allowed.");
-    }
-  }
-
-}
diff --git a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java b/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java
deleted file mode 100644
index 4f55bf7..0000000
--- a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.runtime.fs.MockRemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-
-import java.io.IOException;
-
-public class MockClientContext extends ClientContext {
-
-  private RemoteDirectoryManager remoteDirectoryMgr;
-
-  public MockClientContext(String jobName) throws IOException {
-    this.remoteDirectoryMgr = new MockRemoteDirectoryManager(jobName);
-  }
-
-  @Override
-  public RemoteDirectoryManager getRemoteDirectoryManager() {
-    return remoteDirectoryMgr;
-  }
-
-  public void setRemoteDirectoryMgr(
-      RemoteDirectoryManager remoteDirectoryMgr) {
-    this.remoteDirectoryMgr = remoteDirectoryMgr;
-  }
-}
diff --git a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java
deleted file mode 100644
index 13fa78e..0000000
--- a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Time;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Objects;
-
-public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
-
-  private static final String FAILED_TO_CREATE_DIRS_FORMAT_STRING =
-      "Failed to create directories under path: %s";
-  private static final String JOB_NAME_MUST_NOT_BE_NULL =
-      "Job name must not be null!";
-  private static final File STAGING_AREA = new File("target/_staging_area_");
-
-  private File jobsParentDir;
-  private File jobDir;
-  private File modelParentDir;
-
-  public MockRemoteDirectoryManager(String jobName) throws IOException {
-    Objects.requireNonNull(jobName, JOB_NAME_MUST_NOT_BE_NULL);
-    this.cleanup();
-    this.jobsParentDir = initializeJobParentDir();
-    this.jobDir = initializeJobDir(jobName);
-    this.modelParentDir = initializeModelParentDir();
-  }
-
-  private void cleanup() throws IOException {
-    FileUtils.deleteDirectory(STAGING_AREA);
-  }
-
-  private File initializeJobParentDir() throws IOException {
-    File dir = new File(STAGING_AREA, String.valueOf(Time.monotonicNow()));
-    if (!dir.mkdirs()) {
-      throw new IOException(
-          String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
-              dir.getAbsolutePath()));
-    }
-    return dir;
-  }
-
-  private File initializeJobDir(String jobName) throws IOException {
-    Objects.requireNonNull(jobsParentDir, "Job parent dir must not be null!");
-    File dir = new File(jobsParentDir.getAbsolutePath(), jobName);
-
-    if (!dir.exists() && !dir.mkdirs()) {
-      throw new IOException(
-          String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
-              dir.getAbsolutePath()));
-    }
-    return dir;
-  }
-
-  private File initializeModelParentDir() throws IOException {
-    File dir = new File(
-        "target/_models_" + System.currentTimeMillis());
-    if (!dir.exists() && !dir.mkdirs()) {
-      throw new IOException(
-          String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
-              dir.getAbsolutePath()));
-    }
-    return dir;
-  }
-
-  @Override
-  public Path getJobStagingArea(String jobName, boolean create)
-      throws IOException {
-    Objects.requireNonNull(jobName, JOB_NAME_MUST_NOT_BE_NULL);
-    Objects.requireNonNull(jobDir, JOB_NAME_MUST_NOT_BE_NULL);
-    this.jobDir = initializeJobDir(jobName);
-    if (create && !jobDir.exists()) {
-      if (!jobDir.mkdirs()) {
-        throw new IOException(
-            String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
-                jobDir.getAbsolutePath()));
-      }
-    }
-    return new Path(jobDir.getAbsolutePath());
-  }
-
-  @Override
-  public Path getJobCheckpointDir(String jobName, boolean create)
-      throws IOException {
-    return new Path("s3://generated_checkpoint_dir");
-  }
-
-  @Override
-  public Path getModelDir(String modelName, boolean create)
-      throws IOException {
-    Objects.requireNonNull(modelParentDir, "Model parent dir must not be null!");
-    File modelDir = new File(modelParentDir.getAbsolutePath(), modelName);
-    if (create) {
-      if (!modelDir.exists() && !modelDir.mkdirs()) {
-        throw new IOException("Failed to mkdirs for "
-            + modelDir.getAbsolutePath());
-      }
-    }
-    return new Path(modelDir.getAbsolutePath());
-  }
-
-  @Override
-  public FileSystem getDefaultFileSystem() throws IOException {
-    return FileSystem.getLocal(new Configuration());
-  }
-
-  @Override
-  public FileSystem getFileSystemByUri(String uri) throws IOException {
-    return getDefaultFileSystem();
-  }
-
-  @Override
-  public Path getUserRootFolder() throws IOException {
-    return new Path("s3://generated_root_dir");
-  }
-
-  @Override
-  public boolean isDir(String uri) throws IOException {
-    return getDefaultFileSystem().getFileStatus(
-        new Path(convertToStagingPath(uri))).isDirectory();
-  }
-
-  @Override
-  public boolean isRemote(String uri) throws IOException {
-    String scheme = new Path(uri).toUri().getScheme();
-    if (null == scheme) {
-      return false;
-    }
-    return !scheme.startsWith("file://");
-  }
-
-  private String convertToStagingPath(String uri) throws IOException {
-    if (isRemote(uri)) {
-      String dirName = new Path(uri).getName();
-      return this.jobDir.getAbsolutePath()
-          + "/" + dirName;
-    }
-    return uri;
-  }
-
-  /**
-   * We use staging dir as mock HDFS dir.
-   * */
-  @Override
-  public boolean copyRemoteToLocal(String remoteUri, String localUri)
-      throws IOException {
-    // mock the copy from HDFS into a local copy
-    Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri));
-    File old = new File(convertToStagingPath(localUri));
-    if (old.isDirectory() && old.exists()) {
-      if (!FileUtil.fullyDelete(old)) {
-        throw new IOException("Cannot delete temp dir:"
-            + old.getAbsolutePath());
-      }
-    }
-    return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir,
-        new File(localUri), false,
-        getDefaultFileSystem().getConf());
-  }
-
-  @Override
-  public boolean existsRemoteFile(String uri) throws IOException {
-    String dirName = new Path(uri).getName();
-    String fakeLocalFilePath = this.jobDir.getAbsolutePath()
-        + "/" + dirName;
-    return new File(fakeLocalFilePath).exists();
-  }
-
-  @Override
-  public FileStatus getRemoteFileStatus(String uri) throws IOException {
-    return getDefaultFileSystem().getFileStatus(new Path(
-        convertToStagingPath(uri)));
-  }
-
-  @Override
-  public long getRemoteFileSize(String uri) throws IOException {
-    // 5 byte for this file to test
-    if (uri.equals("https://a/b/1.patch")) {
-      return 5;
-    }
-    return 100 * 1024 * 1024;
-  }
-
-  public void setJobDir(File jobDir) {
-    this.jobDir = jobDir;
-  }
-}
diff --git a/submarine-dist/pom.xml b/submarine-dist/pom.xml
index c0fdb9e..19dbefc 100644
--- a/submarine-dist/pom.xml
+++ b/submarine-dist/pom.xml
@@ -33,16 +33,6 @@
   <name>Submarine: Dist</name>
   <packaging>pom</packaging>
 
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-services-core</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.submarine</groupId>
@@ -104,7 +94,7 @@
               <goal>single</goal>
             </goals>
             <configuration>
-              <finalName>${project.artifactId}-${project.version}-${profile-id}</finalName>
+              <finalName>${project.artifactId}-${project.version}</finalName>
               <appendAssemblyId>false</appendAssemblyId>
               <attach>false</attach>
               <descriptors>
diff --git a/submarine-test/test-e2e/pom.xml b/submarine-test/test-e2e/pom.xml
index 4d02c2c..3705fa8 100644
--- a/submarine-test/test-e2e/pom.xml
+++ b/submarine-test/test-e2e/pom.xml
@@ -181,7 +181,7 @@
       </activation>
       <properties>
         <submarine.daemon.package.base>
-          ../../submarine-dist/target/submarine-dist-${project.version}-hadoop-2.9/submarine-dist-${project.version}-hadoop-2.9/bin
+          ../../submarine-dist/target/submarine-dist-${project.version}/submarine-dist-${project.version}/bin
         </submarine.daemon.package.base>
       </properties>
     </profile>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org