You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/13 08:53:12 UTC

[1/3] hadoop git commit: YARN-8561. [Submarine] Initial implementation: Training job submission and job history retrieval. Contributed by Wangda Tan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk a8dae0047 -> cadbc8b57


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/YarnServiceCliTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/YarnServiceCliTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/YarnServiceCliTestUtils.java
new file mode 100644
index 0000000..e4825ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/YarnServiceCliTestUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
+
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.MemorySubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceRuntimeFactory;
+
+public class YarnServiceCliTestUtils {
+  public static MockClientContext getMockClientContext() {
+    MockClientContext mockClientContext = new MockClientContext();
+    RuntimeFactory runtimeFactory = new YarnServiceRuntimeFactory(
+        mockClientContext);
+    mockClientContext.setRuntimeFactory(runtimeFactory);
+    runtimeFactory.setSubmarineStorage(new MemorySubmarineStorage());
+    return mockClientContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
new file mode 100644
index 0000000..5c06ddc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common;
+
+import org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockClientContext extends ClientContext {
+  private MockRemoteDirectoryManager remoteDirectoryMgr =
+      new MockRemoteDirectoryManager();
+
+  @Override
+  public RemoteDirectoryManager getRemoteDirectoryManager() {
+    return remoteDirectoryMgr;
+  }
+
+  @Override
+  public synchronized YarnClient getOrCreateYarnClient() {
+    YarnClient client = mock(YarnClient.class);
+    try {
+      when(client.getResourceTypeInfo()).thenReturn(
+          ResourceUtils.getResourcesTypeInfo());
+    } catch (YarnException e) {
+      fail(e.getMessage());
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
new file mode 100644
index 0000000..a195b59
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+
+public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
+  private File jobsParentDir = null;
+  private File modelParentDir = null;
+
+  @Override
+  public Path getJobStagingArea(String jobName, boolean create)
+      throws IOException {
+    if (jobsParentDir == null && create) {
+      jobsParentDir = new File(
+          "target/_staging_area_" + System.currentTimeMillis());
+      if (!jobsParentDir.mkdirs()) {
+        throw new IOException(
+            "Failed to mkdirs for" + jobsParentDir.getAbsolutePath());
+      }
+    }
+
+    File jobDir = new File(jobsParentDir.getAbsolutePath(), jobName);
+    if (create && !jobDir.exists()) {
+      if (!jobDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs for " + jobDir.getAbsolutePath());
+      }
+    }
+    return new Path(jobDir.getAbsolutePath());
+  }
+
+  @Override
+  public Path getJobCheckpointDir(String jobName, boolean create)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Path getModelDir(String modelName, boolean create) throws IOException {
+    if (modelParentDir == null && create) {
+      modelParentDir = new File(
+          "target/_models_" + System.currentTimeMillis());
+      if (!modelParentDir.mkdirs()) {
+        throw new IOException(
+            "Failed to mkdirs for " + modelParentDir.getAbsolutePath());
+      }
+    }
+
+    File modelDir = new File(modelParentDir.getAbsolutePath(), modelName);
+    if (create) {
+      if (!modelDir.exists() && !modelDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs for " + modelDir.getAbsolutePath());
+      }
+    }
+    return new Path(modelDir.getAbsolutePath());
+  }
+
+  @Override
+  public FileSystem getFileSystem() throws IOException {
+    return FileSystem.getLocal(new Configuration());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/MemorySubmarineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/MemorySubmarineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/MemorySubmarineStorage.java
new file mode 100644
index 0000000..013614e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/MemorySubmarineStorage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MemorySubmarineStorage extends SubmarineStorage {
+  private Map<String, Map<String, String>> jobsInfo = new HashMap<>();
+  private Map<String, Map<String, Map<String, String>>> modelsInfo =
+      new HashMap<>();
+
+  @Override
+  public synchronized void addNewJob(String jobName, Map<String, String> jobInfo)
+      throws IOException {
+    jobsInfo.put(jobName, jobInfo);
+  }
+
+  @Override
+  public synchronized Map<String, String> getJobInfoByName(String jobName)
+      throws IOException {
+    Map<String, String> info = jobsInfo.get(jobName);
+    if (info == null) {
+      throw new IOException("Failed to find job=" + jobName);
+    }
+    return info;
+  }
+
+  @Override
+  public synchronized void addNewModel(String modelName, String version,
+      Map<String, String> modelInfo) throws IOException {
+    if (!modelsInfo.containsKey(modelName)) {
+      modelsInfo.put(modelName, new HashMap<>());
+    }
+    modelsInfo.get(modelName).put(version, modelInfo);
+  }
+
+  @Override
+  public synchronized Map<String, String> getModelInfoByName(String modelName,
+      String version) throws IOException {
+
+    boolean notFound = false;
+    Map<String, String> info = null;
+    try {
+       info = modelsInfo.get(modelName).get(version);
+    } catch (NullPointerException e) {
+      notFound = true;
+    }
+
+    if (notFound || info == null) {
+      throw new IOException(
+          "Failed to find, model=" + modelName + " version=" + version);
+    }
+
+    return info;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/TestFSBasedSubmarineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/TestFSBasedSubmarineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/TestFSBasedSubmarineStorage.java
new file mode 100644
index 0000000..52a68b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/common/TestFSBasedSubmarineStorage.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFSBasedSubmarineStorage {
+  private Map<String, String> getMap(String prefix) {
+    Map<String, String> map = new HashMap<>();
+    map.put(prefix + "1", "1");
+    map.put(prefix + "2", "2");
+    map.put(prefix + "3", "3");
+    map.put(prefix + "4", "4");
+    return map;
+  }
+
+  private void compareMap(Map<String, String> map1, Map<String, String> map2) {
+    Assert.assertEquals(map1.size(), map2.size());
+    for (String k : map1.keySet()) {
+      Assert.assertEquals(map1.get(k), map2.get(k));
+    }
+  }
+
+  @Test
+  public void testStorageOps() throws IOException {
+    MockRemoteDirectoryManager remoteDirectoryManager = new MockRemoteDirectoryManager();
+    ClientContext clientContext = mock(ClientContext.class);
+    when(clientContext.getRemoteDirectoryManager()).thenReturn(remoteDirectoryManager);
+    FSBasedSubmarineStorageImpl storage = new FSBasedSubmarineStorageImpl(
+        clientContext);
+    storage.addNewJob("job1", getMap("job1"));
+    storage.addNewJob("job2", getMap("job2"));
+    storage.addNewJob("job3", getMap("job3"));
+    storage.addNewJob("job4", new HashMap<>());
+    storage.addNewModel("model1", "1.0", getMap("model1_1.0"));
+    storage.addNewModel("model1", "2.0.0", getMap("model1_2.0.0"));
+    storage.addNewModel("model2", null, getMap("model1_default"));
+    storage.addNewModel("model2", "1.0", getMap("model2_1.0"));
+
+    // create a new storage and read it back.
+    storage = new FSBasedSubmarineStorageImpl(
+        clientContext);
+    compareMap(getMap("job1"), storage.getJobInfoByName("job1"));
+    compareMap(getMap("job2"), storage.getJobInfoByName("job2"));
+    compareMap(getMap("job3"), storage.getJobInfoByName("job3"));
+    compareMap(new HashMap<>(), storage.getJobInfoByName("job4"));
+    compareMap(getMap("model1_1.0"), storage.getModelInfoByName("model1", "1.0"));
+    compareMap(getMap("model1_2.0.0"), storage.getModelInfoByName("model1", "2.0.0"));
+    compareMap(getMap("model2_1.0"), storage.getModelInfoByName("model2", "1.0"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/TestTFConfigGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/TestTFConfigGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/TestTFConfigGenerator.java
new file mode 100644
index 0000000..d7dc874
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/TestTFConfigGenerator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTFConfigGenerator {
+  @Test
+  public void testSimpleDistributedTFConfigGenerator() throws JSONException {
+    String json = YarnServiceUtils.getTFConfigEnv("worker", 5, 3, "wtan",
+        "tf-job-001", "example.com");
+    String expected =
+        "{\\\"cluster\\\":{\\\"master\\\":[\\\"master-0.wtan.tf-job-001.example.com:8000\\\"],\\\"worker\\\":[\\\"worker-0.wtan.tf-job-001.example.com:8000\\\",\\\"worker-1.wtan.tf-job-001.example.com:8000\\\",\\\"worker-2.wtan.tf-job-001.example.com:8000\\\",\\\"worker-3.wtan.tf-job-001.example.com:8000\\\"],\\\"ps\\\":[\\\"ps-0.wtan.tf-job-001.example.com:8000\\\",\\\"ps-1.wtan.tf-job-001.example.com:8000\\\",\\\"ps-2.wtan.tf-job-001.example.com:8000\\\"]},\\\"task\\\":{ \\\"type\\\":\\\"worker\\\", \\\"index\\\":$_TASK_INDEX},\\\"environment\\\":\\\"cloud\\\"}";
+    Assert.assertEquals(expected, json);
+
+    json = YarnServiceUtils.getTFConfigEnv("ps", 5, 3, "wtan", "tf-job-001",
+        "example.com");
+    expected =
+        "{\\\"cluster\\\":{\\\"master\\\":[\\\"master-0.wtan.tf-job-001.example.com:8000\\\"],\\\"worker\\\":[\\\"worker-0.wtan.tf-job-001.example.com:8000\\\",\\\"worker-1.wtan.tf-job-001.example.com:8000\\\",\\\"worker-2.wtan.tf-job-001.example.com:8000\\\",\\\"worker-3.wtan.tf-job-001.example.com:8000\\\"],\\\"ps\\\":[\\\"ps-0.wtan.tf-job-001.example.com:8000\\\",\\\"ps-1.wtan.tf-job-001.example.com:8000\\\",\\\"ps-2.wtan.tf-job-001.example.com:8000\\\"]},\\\"task\\\":{ \\\"type\\\":\\\"ps\\\", \\\"index\\\":$_TASK_INDEX},\\\"environment\\\":\\\"cloud\\\"}";
+    Assert.assertEquals(expected, json);
+
+    json = YarnServiceUtils.getTFConfigEnv("master", 2, 1, "wtan", "tf-job-001",
+        "example.com");
+    expected =
+        "{\\\"cluster\\\":{\\\"master\\\":[\\\"master-0.wtan.tf-job-001.example.com:8000\\\"],\\\"worker\\\":[\\\"worker-0.wtan.tf-job-001.example.com:8000\\\"],\\\"ps\\\":[\\\"ps-0.wtan.tf-job-001.example.com:8000\\\"]},\\\"task\\\":{ \\\"type\\\":\\\"master\\\", \\\"index\\\":$_TASK_INDEX},\\\"environment\\\":\\\"cloud\\\"}";
+    Assert.assertEquals(expected, json);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/core-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/core-site.xml
new file mode 100644
index 0000000..50ec146
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/core-site.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/hdfs-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/hdfs-site.xml
new file mode 100644
index 0000000..50ec146
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/resources/hdfs-site.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
index 490e9ad..4c03f3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
@@ -37,6 +37,7 @@
     <module>hadoop-yarn-applications-distributedshell</module>
     <module>hadoop-yarn-applications-unmanaged-am-launcher</module>
     <module>hadoop-yarn-services</module>
+    <module>hadoop-yarn-submarine</module>
   </modules>
 
  <profiles>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: YARN-8561. [Submarine] Initial implementation: Training job submission and job history retrieval. Contributed by Wangda Tan.

Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
new file mode 100644
index 0000000..b6a39b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.exception;
+
+public class SubmarineException extends Exception {
+  public SubmarineException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
new file mode 100644
index 0000000..4fb74fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.exception;
+
+public class SubmarineRuntimeException extends RuntimeException {
+  public SubmarineRuntimeException(String s) {
+    super(s);
+  }
+
+  public SubmarineRuntimeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
new file mode 100644
index 0000000..fe8956a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+
+import java.io.IOException;
+
+/**
+ * Manages remote directories for staging, log, etc.
+ * TODO, need to properly handle permission / name validation, etc.
+ */
+public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
+  FileSystem fs;
+
+  public DefaultRemoteDirectoryManager(ClientContext context) {
+    try {
+      this.fs = FileSystem.get(context.getYarnConfig());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Path getJobStagingArea(String jobName, boolean create) throws IOException {
+    Path staging = new Path(getJobRootFolder(jobName), "staging");
+    if (create) {
+      createFolderIfNotExist(staging);
+    }
+    return staging;
+  }
+
+  @Override
+  public Path getJobCheckpointDir(String jobName, boolean create)
+      throws IOException {
+    Path jobDir = new Path(getJobStagingArea(jobName, create),
+        CliConstants.CHECKPOINT_PATH);
+    if (create) {
+      createFolderIfNotExist(jobDir);
+    }
+    return jobDir;
+  }
+
+  @Override
+  public Path getModelDir(String modelName, boolean create) throws IOException {
+    Path modelDir = new Path(new Path("submarine", "models"), modelName);
+    if (create) {
+      createFolderIfNotExist(modelDir);
+    }
+    return modelDir;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+
+  private Path getJobRootFolder(String jobName) throws IOException {
+    return new Path(new Path("submarine", "jobs"), jobName);
+  }
+
+  private void createFolderIfNotExist(Path path) throws IOException {
+    if (!fs.exists(path)) {
+      if (!fs.mkdirs(path)) {
+        throw new IOException("Failed to create folder=" + path);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
new file mode 100644
index 0000000..132b314
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface RemoteDirectoryManager {
+  Path getJobStagingArea(String jobName, boolean create) throws IOException;
+
+  Path getJobCheckpointDir(String jobName, boolean create) throws IOException;
+
+  Path getModelDir(String modelName, boolean create) throws IOException;
+
+  FileSystem getFileSystem() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
new file mode 100644
index 0000000..9c164c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
+import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
+
+import java.lang.reflect.InvocationTargetException;
+
+public abstract class RuntimeFactory {
+  protected ClientContext clientContext;
+  private JobSubmitter jobSubmitter;
+  private JobMonitor jobMonitor;
+  private SubmarineStorage submarineStorage;
+
+  public RuntimeFactory(ClientContext clientContext) {
+    this.clientContext = clientContext;
+  }
+
+  public static RuntimeFactory getRuntimeFactory(
+      ClientContext clientContext) {
+    Configuration submarineConfiguration =
+        clientContext.getSubmarineConfig();
+    String runtimeClass = submarineConfiguration.get(
+        SubmarineConfiguration.RUNTIME_CLASS,
+        SubmarineConfiguration.DEFAULT_RUNTIME_CLASS);
+
+    try {
+      Class<?> runtimeClazz = Class.forName(runtimeClass);
+      if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) {
+        return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext);
+      } else {
+        throw new SubmarineRuntimeException("Class: " + runtimeClass
+            + " not instance of " + RuntimeFactory.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException | IllegalAccessException |
+             InstantiationException | NoSuchMethodException |
+             InvocationTargetException e) {
+      throw new SubmarineRuntimeException(
+          "Could not instantiate RuntimeFactory: " + runtimeClass, e);
+    }
+  }
+
+  protected abstract JobSubmitter internalCreateJobSubmitter();
+
+  protected abstract JobMonitor internalCreateJobMonitor();
+
+  protected abstract SubmarineStorage internalCreateSubmarineStorage();
+
+  public synchronized JobSubmitter getJobSubmitterInstance() {
+    if (jobSubmitter == null) {
+      jobSubmitter = internalCreateJobSubmitter();
+    }
+    return jobSubmitter;
+  }
+
+  public synchronized JobMonitor getJobMonitorInstance() {
+    if (jobMonitor == null) {
+      jobMonitor = internalCreateJobMonitor();
+    }
+    return jobMonitor;
+  }
+
+  public synchronized SubmarineStorage getSubmarineStorage() {
+    if (submarineStorage == null) {
+      submarineStorage = internalCreateSubmarineStorage();
+    }
+    return submarineStorage;
+  }
+
+  @VisibleForTesting
+  public synchronized void setJobSubmitterInstance(JobSubmitter jobSubmitter) {
+    this.jobSubmitter = jobSubmitter;
+  }
+
+  @VisibleForTesting
+  public synchronized void setJobMonitorInstance(JobMonitor jobMonitor) {
+    this.jobMonitor = jobMonitor;
+  }
+
+  @VisibleForTesting
+  public synchronized void setSubmarineStorage(SubmarineStorage storage) {
+    this.submarineStorage = storage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
new file mode 100644
index 0000000..ebf9581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+/**
+ * A super naive FS-based storage.
+ */
+public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
+  ClientContext clientContext;
+  RemoteDirectoryManager rdm;
+
+  public FSBasedSubmarineStorageImpl(ClientContext clientContext) {
+    this.clientContext = clientContext;
+    rdm = clientContext.getRemoteDirectoryManager();
+  }
+
+  @Override
+  public void addNewJob(String jobName, Map<String, String> jobInfo)
+      throws IOException {
+    Path jobInfoPath = getJobInfoPath(jobName, true);
+    FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath);
+    serializeMap(fos, jobInfo);
+  }
+
+  @Override
+  public Map<String, String> getJobInfoByName(String jobName)
+      throws IOException {
+    Path jobInfoPath = getJobInfoPath(jobName, false);
+    FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath);
+    return deserializeMap(fis);
+  }
+
+  @Override
+  public void addNewModel(String modelName, String version,
+      Map<String, String> modelInfo) throws IOException {
+    Path modelInfoPath = getModelInfoPath(modelName, version, true);
+    FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath);
+    serializeMap(fos, modelInfo);
+  }
+
+  @Override
+  public Map<String, String> getModelInfoByName(String modelName,
+      String version) throws IOException {
+    Path modelInfoPath = getModelInfoPath(modelName, version, false);
+    FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath);
+    return deserializeMap(fis);
+  }
+
+  private Path getModelInfoPath(String modelName, String version, boolean create)
+      throws IOException {
+    Path modelDir = rdm.getModelDir(modelName, create);
+    Path modelInfo = new Path(modelDir, version + ".info");
+    return modelInfo;
+  }
+
+  private void serializeMap(FSDataOutputStream fos, Map<String, String> map)
+      throws IOException {
+    ObjectOutput oo = new ObjectOutputStream(fos);
+    oo.writeObject(map);
+    oo.close();
+  }
+
+  private Map<String, String> deserializeMap(FSDataInputStream fis)
+      throws IOException {
+    ObjectInput oi = new ObjectInputStream(fis);
+    Map<String, String> newMap = null;
+    try {
+      newMap = (Map<String, String>) oi.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    return newMap;
+  }
+
+  private Path getJobInfoPath(String jobName, boolean create) throws IOException {
+    Path path = rdm.getJobStagingArea(jobName, create);
+    Path fileName = new Path(path, "job.info");
+    return fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
new file mode 100644
index 0000000..c81393b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.JobState;
+import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Monitor status of job(s)
+ */
+public abstract class JobMonitor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(JobMonitor.class);
+  protected ClientContext clientContext;
+
+  public JobMonitor(ClientContext clientContext) {
+    this.clientContext = clientContext;
+  }
+
+  /**
+   * Returns status of training job.
+   *
+   * @param jobName name of job
+   * @return job status
+   * @throws IOException anything else happens
+   * @throws YarnException anything related to YARN happens
+   */
+  public abstract JobStatus getTrainingJobStatus(String jobName)
+      throws IOException, YarnException;
+
+  /**
+   * Continue wait and print status if job goes to ready or final state.
+   * @param jobName
+   * @throws IOException
+   * @throws YarnException
+   * @throws SubmarineException
+   */
+  public void waitTrainingFinal(String jobName)
+      throws IOException, YarnException, SubmarineException {
+    // Wait 5 sec between each fetch.
+    int waitIntervalSec = 5;
+    JobStatus js;
+    while (true) {
+      js = getTrainingJobStatus(jobName);
+      JobState jobState = js.getState();
+      js.nicePrint(System.err);
+
+      if (JobState.isFinal(jobState)) {
+        if (jobState.equals(JobState.FAILED)) {
+          throw new SubmarineException("Job failed");
+        } else if (jobState.equals(JobState.KILLED)) {
+          throw new SubmarineException("Job killed");
+        }
+        LOG.info("Job exited with state=" + jobState);
+        break;
+      }
+
+      try {
+        Thread.sleep(waitIntervalSec * 1000);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
new file mode 100644
index 0000000..1749390
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+
+import java.io.IOException;
+
+/**
+ * Submit job to cluster master
+ */
+public interface JobSubmitter {
+  /**
+   * Submit job to cluster
+   * @param parameters run job parameters
+   * @return applicatioId when successfully submitted
+   * @throws YarnException for issues while contacting YARN daemons
+   * @throws IOException for other issues.
+   */
+  ApplicationId submitJob(RunJobParameters parameters)
+      throws IOException, YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
new file mode 100644
index 0000000..1fbbe7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+public class StorageKeyConstants {
+  public static final String JOB_NAME = "JOB_NAME";
+  public static final String JOB_RUN_ARGS = "JOB_RUN_ARGS";
+  public static final String APPLICATION_ID = "APPLICATION_ID";
+  public static final String CHECKPOINT_PATH = "CHECKPOINT_PATH";
+  public static final String INPUT_PATH = "INPUT_PATH";
+  public static final String SAVED_MODEL_PATH = "SAVED_MODEL_PATH";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
new file mode 100644
index 0000000..9c2004f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Persistent job/model, etc.
+ */
+public abstract class SubmarineStorage {
+  /**
+   * Add a new job by name
+   * @param jobName name of job.
+   * @param jobInfo info of the job.
+   */
+  public abstract void addNewJob(String jobName, Map<String, String> jobInfo)
+      throws IOException;
+
+  /**
+   * Get job info by job name.
+   * @param jobName name of job
+   * @return info of the job.
+   */
+  public abstract Map<String, String> getJobInfoByName(String jobName)
+      throws IOException;
+
+  /**
+   * Add a new model
+   * @param modelName name of model
+   * @param version version of the model, when null is specified, it will be
+   *                "default"
+   * @param modelInfo info of the model.
+   */
+  public abstract void addNewModel(String modelName, String version,
+      Map<String, String> modelInfo) throws IOException;
+
+  /**
+   * Get model info by name and version.
+   *  @param modelName name of model.
+   * @param version version of the model, when null is specifed, it will be
+   */
+  public abstract Map<String, String> getModelInfoByName(String modelName, String version)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
new file mode 100644
index 0000000..94d30b0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
+import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+
+import java.io.IOException;
+
+public class YarnServiceJobMonitor extends JobMonitor {
+  private ServiceClient serviceClient = null;
+
+  public YarnServiceJobMonitor(ClientContext clientContext) {
+    super(clientContext);
+  }
+
+  @Override
+  public synchronized JobStatus getTrainingJobStatus(String jobName)
+      throws IOException, YarnException {
+    if (this.serviceClient == null) {
+      this.serviceClient = YarnServiceUtils.createServiceClient(
+          clientContext.getYarnConfig());
+    }
+
+    Service serviceSpec = this.serviceClient.getStatus(jobName);
+    JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec);
+    return jobStatus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
new file mode 100644
index 0000000..3cd0d7e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.Resource;
+import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.Envs;
+import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+/**
+ * Submit a job to cluster
+ */
+public class YarnServiceJobSubmitter implements JobSubmitter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(YarnServiceJobSubmitter.class);
+  ClientContext clientContext;
+  Service serviceSpec;
+  private Set<Path> uploadedFiles = new HashSet<>();
+
+  public YarnServiceJobSubmitter(ClientContext clientContext) {
+    this.clientContext = clientContext;
+  }
+
+  private Resource getServiceResourceFromYarnResource(
+      org.apache.hadoop.yarn.api.records.Resource yarnResource) {
+    Resource serviceResource = new Resource();
+    serviceResource.setCpus(yarnResource.getVirtualCores());
+    serviceResource.setMemory(String.valueOf(yarnResource.getMemorySize()));
+
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+    for (org.apache.hadoop.yarn.api.records.ResourceInformation ri : yarnResource
+        .getAllResourcesListCopy()) {
+      ResourceInformation serviceRi =
+          new ResourceInformation();
+      serviceRi.setValue(ri.getValue());
+      serviceRi.setUnit(ri.getUnits());
+      riMap.put(ri.getName(), serviceRi);
+    }
+    serviceResource.setResourceInformations(riMap);
+
+    return serviceResource;
+  }
+
+  private String getValueOfEnvionment(String envar) {
+    // extract value from "key=value" form
+    if (envar == null || !envar.contains("=")) {
+      return "";
+    } else {
+      return envar.substring(envar.indexOf("=") + 1);
+    }
+  }
+
+  private void addHdfsClassPathIfNeeded(RunJobParameters parameters,
+      FileWriter fw, Component comp) throws IOException {
+    // Find envs to use HDFS
+    String hdfsHome = null;
+    String javaHome = null;
+
+    boolean hadoopEnv = false;
+
+    for (String envar : parameters.getEnvars()) {
+      if (envar.startsWith("DOCKER_HADOOP_HDFS_HOME=")) {
+        hdfsHome = getValueOfEnvionment(envar);
+        hadoopEnv = true;
+      } else if (envar.startsWith("DOCKER_JAVA_HOME=")) {
+        javaHome = getValueOfEnvionment(envar);
+      }
+    }
+
+    boolean lackingEnvs = false;
+
+    if ((parameters.getInputPath() != null && parameters.getInputPath()
+        .contains("hdfs://")) || (parameters.getCheckpointPath() != null
+        && parameters.getCheckpointPath().contains("hdfs://")) || (
+        parameters.getSavedModelPath() != null && parameters.getSavedModelPath()
+            .contains("hdfs://")) || hadoopEnv) {
+      // HDFS is asked either in input or output, set LD_LIBRARY_PATH
+      // and classpath
+
+      if (hdfsHome != null) {
+        // Unset HADOOP_HOME/HADOOP_YARN_HOME to make sure host machine's envs
+        // won't pollute docker's env.
+        fw.append("export HADOOP_HOME=\n");
+        fw.append("export HADOOP_YARN_HOME=\n");
+        fw.append("export HADOOP_HDFS_HOME=" + hdfsHome + "\n");
+      } else{
+        lackingEnvs = true;
+      }
+
+      // hadoop confs will be uploaded to HDFS and localized to container's
+      // local folder, so here set $HADOOP_CONF_DIR to $WORK_DIR.
+      fw.append("export HADOOP_CONF_DIR=$WORK_DIR\n");
+      if (javaHome != null) {
+        fw.append("export JAVA_HOME=" + javaHome + "\n");
+        fw.append("export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:"
+            + "$JAVA_HOME/lib/amd64/server\n");
+      } else {
+        lackingEnvs = true;
+      }
+      fw.append("export CLASSPATH=`$HADOOP_HDFS_HOME/bin/hadoop classpath --glob`\n");
+    }
+
+    if (lackingEnvs) {
+      LOG.error("When hdfs is being used to read/write models/data. Following"
+          + "envs are required: 1) DOCKER_HADOOP_HDFS_HOME=<HDFS_HOME inside"
+          + "docker container> 2) DOCKER_JAVA_HOME=<JAVA_HOME inside docker"
+          + "container>. You can use --env to pass these envars.");
+      throw new IOException("Failed to detect HDFS-related environments.");
+    }
+
+    // Trying to upload core-site.xml and hdfs-site.xml
+    Path stagingDir =
+        clientContext.getRemoteDirectoryManager().getJobStagingArea(
+            parameters.getName(), true);
+    File coreSite = findFileOnClassPath("core-site.xml");
+    File hdfsSite = findFileOnClassPath("hdfs-site.xml");
+    if (coreSite == null || hdfsSite == null) {
+      LOG.error("hdfs is being used, however we couldn't locate core-site.xml/"
+          + "hdfs-site.xml from classpath, please double check you classpath"
+          + "setting and make sure they're included.");
+      throw new IOException(
+          "Failed to locate core-site.xml / hdfs-site.xml from class path");
+    }
+    uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir,
+        coreSite.getAbsolutePath(), "core-site.xml", comp);
+    uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir,
+        hdfsSite.getAbsolutePath(), "hdfs-site.xml", comp);
+
+    // DEBUG
+    if (SubmarineLogs.isVerbose()) {
+      fw.append("echo $CLASSPATH\n");
+      fw.append("echo $JAVA_HOME\n");
+      fw.append("echo $LD_LIBRARY_PATH\n");
+      fw.append("echo $HADOOP_HDFS_HOME\n");
+    }
+  }
+
+  private void addCommonEnvironments(Component component, TaskType taskType) {
+    Map<String, String> envs = component.getConfiguration().getEnv();
+    envs.put(Envs.TASK_INDEX_ENV, ServiceApiConstants.COMPONENT_ID);
+    envs.put(Envs.TASK_TYPE_ENV, taskType.name());
+  }
+
+  /*
+   * Generate a command launch script on local disk, returns patch to the script
+   */
+  private String generateCommandLaunchScript(RunJobParameters parameters,
+      TaskType taskType, Component comp) throws IOException {
+    File file = File.createTempFile(taskType.name() + "-launch-script", ".sh");
+    FileWriter fw = new FileWriter(file);
+
+    fw.append("#!/bin/bash\n");
+
+    addHdfsClassPathIfNeeded(parameters, fw, comp);
+
+    // For primary_worker
+    if (taskType == TaskType.PRIMARY_WORKER) {
+      // Do we need tensorboard?
+      if (parameters.isTensorboardEnabled()) {
+        int tensorboardPort = 6006;
+        // Run tensorboard at the background
+        fw.append(
+            "tensorboard --port " + tensorboardPort + " --logdir " + parameters
+                .getCheckpointPath() + " &\n");
+      }
+    }
+
+    // When distributed training is required
+    if (parameters.isDistributed()) {
+      // Generated TF_CONFIG
+      String tfConfigEnv = YarnServiceUtils.getTFConfigEnv(
+          taskType.getComponentName(), parameters.getNumWorkers(),
+          parameters.getNumPS(), parameters.getName(),
+          System.getProperty("user.name"),
+          clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name"));
+      fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
+    }
+
+    // Print launch command
+    if (taskType.equals(TaskType.WORKER) || taskType.equals(
+        TaskType.PRIMARY_WORKER)) {
+      fw.append(parameters.getWorkerLaunchCmd() + '\n');
+
+      if (SubmarineLogs.isVerbose()) {
+        LOG.info("Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
+      }
+    } else if (taskType.equals(TaskType.PS)) {
+      fw.append(parameters.getPSLaunchCmd() + '\n');
+
+      if (SubmarineLogs.isVerbose()) {
+        LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
+      }
+    }
+
+    fw.close();
+    return file.getAbsolutePath();
+  }
+
+  private String getScriptFileName(TaskType taskType) {
+    return "run-" + taskType.name() + ".sh";
+  }
+
+  private File findFileOnClassPath(final String fileName) {
+    final String classpath = System.getProperty("java.class.path");
+    final String pathSeparator = System.getProperty("path.separator");
+    final StringTokenizer tokenizer = new StringTokenizer(classpath,
+        pathSeparator);
+
+    while (tokenizer.hasMoreTokens()) {
+      final String pathElement = tokenizer.nextToken();
+      final File directoryOrJar = new File(pathElement);
+      final File absoluteDirectoryOrJar = directoryOrJar.getAbsoluteFile();
+      if (absoluteDirectoryOrJar.isFile()) {
+        final File target = new File(absoluteDirectoryOrJar.getParent(),
+            fileName);
+        if (target.exists()) {
+          return target;
+        }
+      } else{
+        final File target = new File(directoryOrJar, fileName);
+        if (target.exists()) {
+          return target;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
+      String fileToUpload, String destFilename, Component comp)
+      throws IOException {
+    FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
+
+    // Upload to remote FS under staging area
+    File localFile = new File(fileToUpload);
+    if (!localFile.exists()) {
+      throw new FileNotFoundException(
+          "Trying to upload file=" + localFile.getAbsolutePath()
+              + " to remote, but couldn't find local file.");
+    }
+    String filename = new File(fileToUpload).getName();
+
+    Path uploadedFilePath = new Path(stagingDir, filename);
+    if (!uploadedFiles.contains(uploadedFilePath)) {
+      if (SubmarineLogs.isVerbose()) {
+        LOG.info("Copying local file=" + fileToUpload + " to remote="
+            + uploadedFilePath);
+      }
+      fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
+      uploadedFiles.add(uploadedFilePath);
+    }
+
+    FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
+    LOG.info("Uploaded file path = " + fileStatus.getPath());
+
+    // Set it to component's files list
+    comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
+        fileStatus.getPath().toUri().toString()).destFile(destFilename)
+        .type(ConfigFile.TypeEnum.STATIC));
+  }
+
+  private void handleLaunchCommand(RunJobParameters parameters,
+      TaskType taskType, Component component) throws IOException {
+    // Get staging area directory
+    Path stagingDir =
+        clientContext.getRemoteDirectoryManager().getJobStagingArea(
+            parameters.getName(), true);
+
+    // Generate script file in the local disk
+    String localScriptFile = generateCommandLaunchScript(parameters, taskType,
+        component);
+    String destScriptFileName = getScriptFileName(taskType);
+    uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir, localScriptFile,
+        destScriptFileName, component);
+
+    component.setLaunchCommand("./" + destScriptFileName);
+  }
+
+  private void addWorkerComponent(Service service,
+      RunJobParameters parameters, TaskType taskType) throws IOException {
+    Component workerComponent = new Component();
+    addCommonEnvironments(workerComponent, taskType);
+
+    workerComponent.setName(taskType.getComponentName());
+
+    if (taskType.equals(TaskType.PRIMARY_WORKER)) {
+      workerComponent.setNumberOfContainers(1L);
+    } else{
+      workerComponent.setNumberOfContainers(
+          (long) parameters.getNumWorkers() - 1);
+    }
+
+    if (parameters.getWorkerDockerImage() != null) {
+      workerComponent.setArtifact(
+          getDockerArtifact(parameters.getWorkerDockerImage()));
+    }
+
+    workerComponent.setResource(
+        getServiceResourceFromYarnResource(parameters.getWorkerResource()));
+    handleLaunchCommand(parameters, taskType, workerComponent);
+    workerComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
+    service.addComponent(workerComponent);
+  }
+
+  // Handle worker and primary_worker.
+  private void addWorkerComponents(Service service, RunJobParameters parameters)
+      throws IOException {
+    addWorkerComponent(service, parameters, TaskType.PRIMARY_WORKER);
+
+    if (parameters.getNumWorkers() > 1) {
+      addWorkerComponent(service, parameters, TaskType.WORKER);
+    }
+  }
+
+  private void appendToEnv(Service service, String key, String value,
+      String delim) {
+    Map<String, String> env = service.getConfiguration().getEnv();
+    if (!env.containsKey(key)) {
+      env.put(key, value);
+    } else {
+      if (!value.isEmpty()) {
+        String existingValue = env.get(key);
+        if (!existingValue.endsWith(delim)) {
+          env.put(key, existingValue + delim + value);
+        } else {
+          env.put(key, existingValue + value);
+        }
+      }
+    }
+  }
+
+  private void handleServiceEnvs(Service service, RunJobParameters parameters) {
+    if (parameters.getEnvars() != null) {
+      for (String envarPair : parameters.getEnvars()) {
+        String key, value;
+        if (envarPair.contains("=")) {
+          int idx = envarPair.indexOf('=');
+          key = envarPair.substring(0, idx);
+          value = envarPair.substring(idx + 1);
+        } else{
+          // No "=" found so use the whole key
+          key = envarPair;
+          value = "";
+        }
+        appendToEnv(service, key, value, ":");
+      }
+    }
+
+    // Append other configs like /etc/passwd, /etc/krb5.conf
+    appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+        "/etc/passwd:/etc/passwd:ro", ",");
+
+    String authenication = clientContext.getYarnConfig().get(
+        HADOOP_SECURITY_AUTHENTICATION);
+    if (authenication != null && authenication.equals("kerberos")) {
+      appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+          "/etc/krb5.conf:/etc/krb5.conf:ro", ",");
+    }
+  }
+
+  private Artifact getDockerArtifact(String dockerImageName) {
+    return new Artifact().type(Artifact.TypeEnum.DOCKER).id(dockerImageName);
+  }
+
+  private Service createServiceByParameters(RunJobParameters parameters)
+      throws IOException {
+    Service service = new Service();
+    service.setName(parameters.getName());
+    service.setVersion(String.valueOf(System.currentTimeMillis()));
+    service.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
+
+    handleServiceEnvs(service, parameters);
+
+    addWorkerComponents(service, parameters);
+
+    if (parameters.getNumPS() > 0) {
+      Component psComponent = new Component();
+      psComponent.setName(TaskType.PS.getComponentName());
+      addCommonEnvironments(psComponent, TaskType.PS);
+      psComponent.setNumberOfContainers((long) parameters.getNumPS());
+      psComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
+      psComponent.setResource(
+          getServiceResourceFromYarnResource(parameters.getPsResource()));
+
+      // Override global docker image if needed.
+      if (parameters.getPsDockerImage() != null) {
+        psComponent.setArtifact(
+            getDockerArtifact(parameters.getPsDockerImage()));
+      }
+      handleLaunchCommand(parameters, TaskType.PS, psComponent);
+      service.addComponent(psComponent);
+    }
+    return service;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ApplicationId submitJob(RunJobParameters parameters)
+      throws IOException, YarnException {
+    Service service = createServiceByParameters(parameters);
+    ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
+        clientContext.getYarnConfig());
+    ApplicationId appid = serviceClient.actionCreate(service);
+    serviceClient.stop();
+    this.serviceSpec = service;
+    return appid;
+  }
+
+  @VisibleForTesting
+  public Service getServiceSpec() {
+    return serviceSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
new file mode 100644
index 0000000..3489e49
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+
+public class YarnServiceRuntimeFactory extends RuntimeFactory {
+
+  public YarnServiceRuntimeFactory(ClientContext clientContext) {
+    super(clientContext);
+  }
+
+  @Override
+  protected JobSubmitter internalCreateJobSubmitter() {
+    return new YarnServiceJobSubmitter(super.clientContext);
+  }
+
+  @Override
+  protected JobMonitor internalCreateJobMonitor() {
+    return new YarnServiceJobMonitor(super.clientContext);
+  }
+
+  @Override
+  protected SubmarineStorage internalCreateSubmarineStorage() {
+    return new FSBasedSubmarineStorageImpl(super.clientContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
new file mode 100644
index 0000000..f7ecc97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.common.Envs;
+
+public class YarnServiceUtils {
+  // This will be true only in UT.
+  private static ServiceClient stubServiceClient = null;
+
+  public static ServiceClient createServiceClient(
+      Configuration yarnConfiguration) {
+    if (stubServiceClient != null) {
+      return stubServiceClient;
+    }
+
+    ServiceClient serviceClient = new ServiceClient();
+    serviceClient.init(yarnConfiguration);
+    serviceClient.start();
+    return serviceClient;
+  }
+
+  @VisibleForTesting
+  public static void setStubServiceClient(ServiceClient stubServiceClient) {
+    YarnServiceUtils.stubServiceClient = stubServiceClient;
+  }
+
+  public static String getTFConfigEnv(String curCommponentName, int nWorkers,
+      int nPs, String serviceName, String userName, String domain) {
+    String commonEndpointSuffix =
+        "." + serviceName + "." + userName + "." + domain + ":8000";
+
+    String json = "{\\\"cluster\\\":{";
+
+    String master = getComponentArrayJson("master", 1, commonEndpointSuffix)
+        + ",";
+    String worker = getComponentArrayJson("worker", nWorkers - 1,
+        commonEndpointSuffix) + ",";
+    String ps = getComponentArrayJson("ps", nPs, commonEndpointSuffix) + "},";
+
+    String task =
+        "\\\"task\\\":{" + " \\\"type\\\":\\\"" + curCommponentName + "\\\","
+            + " \\\"index\\\":" + '$' + Envs.TASK_INDEX_ENV + "},";
+    String environment = "\\\"environment\\\":\\\"cloud\\\"}";
+
+    return json + master + worker + ps + task + environment;
+  }
+
+  private static String getComponentArrayJson(String componentName, int count,
+      String endpointSuffix) {
+    String component = "\\\"" + componentName + "\\\":";
+    String array = "[";
+    for (int i = 0; i < count; i++) {
+      array = array + "\\\"" + componentName + "-" + i
+          + endpointSuffix + "\\\"";
+      if (i != count - 1) {
+        array = array + ",";
+      }
+    }
+    array = array + "]";
+    return component + array;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
new file mode 100644
index 0000000..ce26ea7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
@@ -0,0 +1,26 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Developper Guide
+
+(Need add more details)
+
+By default, submarine uses YARN service framework as runtime. If you want to add your own implementation. You can add a new `RuntimeFactory` implementation and configure following option to `submarine.xml` (which should be placed under same `$HADOOP_CONF_DIR`)
+
+```
+<property>
+  <name>submarine.runtime.class</name>
+  <value>... full qualified class name for your runtime factory ... </value>
+</property>
+```

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
new file mode 100644
index 0000000..b720b5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
@@ -0,0 +1,134 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Quick Start Guide
+
+## Prerequisite
+
+Must:
+- Apache Hadoop 3.1.0, YARN service enabled.
+
+Optional:
+- Enable YARN DNS. (When distributed training required.)
+- Enable GPU on YARN support. (When GPU-based training required.)
+
+## Run jobs
+
+### Commandline options
+
+```$xslt
+usage: job run
+ -checkpoint_path <arg>       Training output directory of the job, could
+                              be local or other FS directory. This
+                              typically includes checkpoint files and
+                              exported model
+ -docker_image <arg>          Docker image name/tag
+ -env <arg>                   Common environment variable of worker/ps
+ -input_path <arg>            Input of the job, could be local or other FS
+                              directory
+ -name <arg>                  Name of the job
+ -num_ps <arg>                Number of PS tasks of the job, by default
+                              it's 0
+ -num_workers <arg>           Numnber of worker tasks of the job, by
+                              default it's 1
+ -ps_docker_image <arg>       Specify docker image for PS, when this is
+                              not specified, PS uses --docker_image as
+                              default.
+ -ps_launch_cmd <arg>         Commandline of worker, arguments will be
+                              directly used to launch the PS
+ -ps_resources <arg>          Resource of each PS, for example
+                              memory-mb=2048,vcores=2,yarn.io/gpu=2
+ -queue <arg>                 Name of queue to run the job, by default it
+                              uses default queue
+ -saved_model_path <arg>      Model exported path (savedmodel) of the job,
+                              which is needed when exported model is not
+                              placed under ${checkpoint_path}could be
+                              local or other FS directory. This will be
+                              used to serve.
+ -tensorboard <arg>           Should we run TensorBoard for this job? By
+                              default it's true
+ -verbose                     Print verbose log for troubleshooting
+ -wait_job_finish             Specified when user want to wait the job
+                              finish
+ -worker_docker_image <arg>   Specify docker image for WORKER, when this
+                              is not specified, WORKER uses --docker_image
+                              as default.
+ -worker_launch_cmd <arg>     Commandline of worker, arguments will be
+                              directly used to launch the worker
+ -worker_resources <arg>      Resource of each worker, for example
+                              memory-mb=2048,vcores=2,yarn.io/gpu=2
+```
+
+### Launch Standalone Tensorflow Application:
+
+#### Commandline
+```
+yarn jar path-to/hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job run \
+  --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \
+  --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 --name tf-job-001 \
+  --docker_image <your-docker-image> \
+  --input_path hdfs://default/dataset/cifar-10-data  \
+  --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \
+  --worker_resources memory=4G,vcores=2,gpu=2  \
+  --worker_launch_cmd "python ... (Your training application cmd)"
+```
+
+#### Notes:
+
+1) `DOCKER_JAVA_HOME` points to JAVA_HOME inside Docker image.
+2) `DOCKER_HADOOP_HDFS_HOME` points to HADOOP_HDFS_HOME inside Docker image.
+3) `--worker_resources` can include gpu when you need GPU to train your task.
+
+### Launch Distributed Tensorflow Application:
+
+#### Commandline
+
+```
+yarn jar hadoop-yarn-applications-submarine-<version>.jar job run \
+ --name tf-job-001 --docker_image <your docker image> \
+ --input_path hdfs://default/dataset/cifar-10-data \
+ --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \
+ --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \
+ --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 \
+ --num_workers 2 \
+ --worker_resources memory=8G,vcores=2,gpu=1 --worker_launch_cmd "cmd for worker ..." \
+ --num_ps 2 \
+ --ps_resources memory=4G,vcores=2,gpu=0 --ps_launch_cmd "cmd for ps" \
+```
+
+#### Notes:
+
+1) Very similar to standalone TF application, but you need to specify #worker/#ps
+2) Different resources can be specified for worker and PS.
+3) `TF_CONFIG` environment will be auto generated and set before executing user's launch command.
+
+## Run jobs
+
+### Get Job Status
+
+```
+yarn jar hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job show --name tf-job-001
+```
+
+Output looks like:
+```
+Job Meta Info:
+	Application Id: application_1532131617202_0005
+	Input Path: hdfs://default/dataset/cifar-10-data
+	Checkpoint Path: hdfs://default/tmp/cifar-10-jobdir
+	Run Parameters: --name tf-job-001 --docker_image wtan/tf-1.8.0-gpu:0.0.3
+	                (... all your commandline before run the job)
+```
+
+After that, you can run ```tensorboard --logdir=<checkpoint-path>``` to view Tensorboard of the job.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
new file mode 100644
index 0000000..295d6a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRunJobCliParsing {
+  @Before
+  public void before() {
+    SubmarineLogs.verboseOff();
+  }
+
+  @Test
+  public void testPrintHelp() {
+    MockClientContext mockClientContext = new MockClientContext();
+    JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
+    JobMonitor mockJobMonitor = mock(JobMonitor.class);
+    RunJobCli runJobCli = new RunJobCli(mockClientContext, mockJobSubmitter,
+        mockJobMonitor);
+    runJobCli.printUsages();
+  }
+
+  private MockClientContext getMockClientContext()
+      throws IOException, YarnException {
+    MockClientContext mockClientContext = new MockClientContext();
+    JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
+    when(mockJobSubmitter.submitJob(any(RunJobParameters.class))).thenReturn(
+        ApplicationId.newInstance(1234L, 1));
+    JobMonitor mockJobMonitor = mock(JobMonitor.class);
+    SubmarineStorage storage = mock(SubmarineStorage.class);
+    RuntimeFactory rtFactory = mock(RuntimeFactory.class);
+
+    when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter);
+    when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor);
+    when(rtFactory.getSubmarineStorage()).thenReturn(storage);
+
+    mockClientContext.setRuntimeFactory(rtFactory);
+    return mockClientContext;
+  }
+
+  @Test
+  public void testBasicRunJobForDistributedTraining() throws Exception {
+    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+            "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+
+    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
+
+    Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
+    Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
+    Assert.assertEquals(jobRunParameters.getNumPS(), 2);
+    Assert.assertEquals(jobRunParameters.getPSLaunchCmd(), "python run-ps.py");
+    Assert.assertEquals(Resources.createResource(4096, 4),
+        jobRunParameters.getPsResource());
+    Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(),
+        "python run-job.py");
+    Assert.assertEquals(Resources.createResource(2048, 2),
+        jobRunParameters.getWorkerResource());
+    Assert.assertEquals(jobRunParameters.getDockerImageName(),
+        "tf-docker:1.1.0");
+    Assert.assertTrue(SubmarineLogs.isVerbose());
+  }
+
+  @Test
+  public void testBasicRunJobForSingleNodeTraining() throws Exception {
+    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+            "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
+            "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
+            "true", "--verbose", "--wait_job_finish" });
+
+    RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
+
+    Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
+    Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
+    Assert.assertEquals(jobRunParameters.getNumWorkers(), 1);
+    Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(),
+        "python run-job.py");
+    Assert.assertEquals(Resources.createResource(4096, 2),
+        jobRunParameters.getWorkerResource());
+    Assert.assertTrue(SubmarineLogs.isVerbose());
+    Assert.assertTrue(jobRunParameters.isWaitJobFinish());
+  }
+
+  @Test
+  public void testLaunchCommandPatternReplace() throws Exception {
+    RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py --input=%input_path% --model_dir=%checkpoint_path% --export_dir=%saved_model_path%/savedmodel",
+            "--worker_resources", "memory=2048,vcores=2", "--ps_resources",
+            "memory=4096,vcores=4", "--tensorboard", "true", "--ps_launch_cmd",
+            "python run-ps.py --input=%input_path% --model_dir=%checkpoint_path%/model",
+            "--verbose" });
+
+    Assert.assertEquals(
+        "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
+            + "--export_dir=hdfs://output/savedmodel",
+        runJobCli.getRunJobParameters().getWorkerLaunchCmd());
+    Assert.assertEquals(
+        "python run-ps.py --input=hdfs://input --model_dir=hdfs://output/model",
+        runJobCli.getRunJobParameters().getPSLaunchCmd());
+  }
+
+  @Test
+  public void testResourceUnitParsing() throws Exception {
+    Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+    res = CliUtils.createResourceFromString("memory=20G,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+    res = CliUtils.createResourceFromString("memory=20M,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20, 3), res);
+
+    res = CliUtils.createResourceFromString("memory=20m,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20, 3), res);
+
+    res = CliUtils.createResourceFromString("memory-mb=20,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20, 3), res);
+
+    res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20, 3), res);
+
+    res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+    // W/o unit for memory means bits, and 20 bits will be rounded to 0
+    res = CliUtils.createResourceFromString("memory=20,vcores=3",
+        ResourceUtils.getResourcesTypeInfo());
+    Assert.assertEquals(Resources.createResource(0, 3), res);
+
+    // Test multiple resources
+    List<ResourceTypeInfo> resTypes = new ArrayList<>(
+        ResourceUtils.getResourcesTypeInfo());
+    resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, ""));
+    ResourceUtils.reinitializeResources(resTypes);
+    res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
+        resTypes);
+    Assert.assertEquals(2 * 1024, res.getMemorySize());
+    Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+    res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
+        resTypes);
+    Assert.assertEquals(2 * 1024, res.getMemorySize());
+    Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+    res = CliUtils.createResourceFromString("memory=2G,vcores=3",
+        resTypes);
+    Assert.assertEquals(2 * 1024, res.getMemorySize());
+    Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+    res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0",
+        resTypes);
+    Assert.assertEquals(2 * 1024, res.getMemorySize());
+    Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+    res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3",
+        resTypes);
+    Assert.assertEquals(2 * 1024, res.getMemorySize());
+    Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+    // TODO, add more negative tests.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
new file mode 100644
index 0000000..9c0d872
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.ShowJobParameters;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.MemorySubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestShowJobCliParsing {
+  @Before
+  public void before() {
+    SubmarineLogs.verboseOff();
+  }
+
+  @Test
+  public void testPrintHelp() {
+    MockClientContext mockClientContext = new MockClientContext();
+    ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
+    showJobCli.printUsages();
+  }
+
+  @Test
+  public void testShowJob()
+      throws InterruptedException, SubmarineException, YarnException,
+      ParseException, IOException {
+    MockClientContext mockClientContext = new MockClientContext();
+    ShowJobCli showJobCli = new ShowJobCli(mockClientContext) {
+      @Override
+      protected void getAndPrintJobInfo() {
+        // do nothing
+      }
+    };
+    showJobCli.run(new String[] { "--name", "my-job" });
+    ShowJobParameters parameters = showJobCli.getParameters();
+    Assert.assertEquals(parameters.getName(), "my-job");
+  }
+
+  private Map<String, String> getMockJobInfo(String jobName) {
+    Map<String, String> map = new HashMap<>();
+    map.put(StorageKeyConstants.APPLICATION_ID,
+        ApplicationId.newInstance(1234L, 1).toString());
+    map.put(StorageKeyConstants.JOB_RUN_ARGS, "job run 123456");
+    map.put(StorageKeyConstants.INPUT_PATH, "hdfs://" + jobName);
+    return map;
+  }
+
+  @Test
+  public void testSimpleShowJob()
+      throws InterruptedException, SubmarineException, YarnException,
+      ParseException, IOException {
+    SubmarineStorage storage = new MemorySubmarineStorage();
+    MockClientContext mockClientContext = new MockClientContext();
+    RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
+    when(runtimeFactory.getSubmarineStorage()).thenReturn(storage);
+    mockClientContext.setRuntimeFactory(runtimeFactory);
+
+    ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
+
+    try {
+      showJobCli.run(new String[] { "--name", "my-job" });
+    } catch (IOException e) {
+      // expected
+    }
+
+
+    storage.addNewJob("my-job", getMockJobInfo("my-job"));
+    showJobCli.run(new String[] { "--name", "my-job" });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
new file mode 100644
index 0000000..e1756b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestYarnServiceRunJobCli {
+  @Before
+  public void before() throws IOException, YarnException {
+    SubmarineLogs.verboseOff();
+    ServiceClient serviceClient = mock(ServiceClient.class);
+    when(serviceClient.actionCreate(any(Service.class))).thenReturn(
+        ApplicationId.newInstance(1234L, 1));
+    YarnServiceUtils.setStubServiceClient(serviceClient);
+  }
+
+  @Test
+  public void testPrintHelp() {
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    runJobCli.printUsages();
+  }
+
+  private Service getServiceSpecFromJobSubmitter(JobSubmitter jobSubmitter) {
+    return ((YarnServiceJobSubmitter) jobSubmitter).getServiceSpec();
+  }
+
+  @Test
+  public void testBasicRunJobForDistributedTraining() throws Exception {
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path",
+            "s3://output", "--num_workers", "3", "--num_ps", "2",
+            "--worker_launch_cmd", "python run-job.py", "--worker_resources",
+            "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4",
+            "--tensorboard", "true", "--ps_docker_image", "ps.image",
+            "--worker_docker_image", "worker.image",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(3, serviceSpec.getComponents().size());
+    Assert.assertTrue(
+        serviceSpec.getComponent(TaskType.WORKER.getComponentName()) != null);
+    Assert.assertTrue(
+        serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName())
+            != null);
+    Assert.assertTrue(
+        serviceSpec.getComponent(TaskType.PS.getComponentName()) != null);
+    Component primaryWorkerComp = serviceSpec.getComponent(
+        TaskType.PRIMARY_WORKER.getComponentName());
+    Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB());
+    Assert.assertEquals(2,
+        primaryWorkerComp.getResource().getCpus().intValue());
+
+    Component workerComp = serviceSpec.getComponent(
+       TaskType.WORKER.getComponentName());
+    Assert.assertEquals(2048, workerComp.getResource().calcMemoryMB());
+    Assert.assertEquals(2, workerComp.getResource().getCpus().intValue());
+
+    Component psComp = serviceSpec.getComponent(TaskType.PS.getComponentName());
+    Assert.assertEquals(4096, psComp.getResource().calcMemoryMB());
+    Assert.assertEquals(4, psComp.getResource().getCpus().intValue());
+
+    Assert.assertEquals("worker.image", workerComp.getArtifact().getId());
+    Assert.assertEquals("ps.image", psComp.getArtifact().getId());
+
+    Assert.assertTrue(SubmarineLogs.isVerbose());
+
+    // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
+  }
+
+  @Test
+  public void testBasicRunJobForSingleNodeTraining() throws Exception {
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path",
+            "s3://output", "--num_workers", "1", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2G,vcores=2",
+            "--tensorboard", "true", "--verbose" });
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(1, serviceSpec.getComponents().size());
+    Assert.assertTrue(
+        serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName())
+            != null);
+    Component primaryWorkerComp = serviceSpec.getComponent(
+        TaskType.PRIMARY_WORKER.getComponentName());
+    Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB());
+    Assert.assertEquals(2,
+        primaryWorkerComp.getResource().getCpus().intValue());
+
+    Assert.assertTrue(SubmarineLogs.isVerbose());
+
+    // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
+  }
+
+  @Test
+  public void testParameterStorageForTrainingJob() throws Exception {
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    runJobCli.run(
+        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path",
+            "s3://output", "--num_workers", "1", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2G,vcores=2",
+            "--tensorboard", "true", "--verbose" });
+    SubmarineStorage storage =
+        mockClientContext.getRuntimeFactory().getSubmarineStorage();
+    Map<String, String> jobInfo = storage.getJobInfoByName("my-job");
+    Assert.assertTrue(jobInfo.size() > 0);
+    Assert.assertEquals(jobInfo.get(StorageKeyConstants.INPUT_PATH),
+        "s3://input");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: YARN-8561. [Submarine] Initial implementation: Training job submission and job history retrieval. Contributed by Wangda Tan.

Posted by su...@apache.org.
YARN-8561. [Submarine] Initial implementation: Training job submission and job history retrieval. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cadbc8b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cadbc8b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cadbc8b5

Branch: refs/heads/trunk
Commit: cadbc8b57f94896aeff2ff5583c9a5ec374c80e2
Parents: a8dae00
Author: Sunil G <su...@apache.org>
Authored: Mon Aug 13 14:22:55 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Aug 13 14:22:55 2018 +0530

----------------------------------------------------------------------
 .../hadoop-yarn-submarine/README.md             |  53 +++
 .../hadoop-yarn-submarine/pom.xml               | 213 +++++++++
 .../yarn/submarine/client/cli/AbstractCli.java  |  47 ++
 .../hadoop/yarn/submarine/client/cli/Cli.java   | 106 +++++
 .../yarn/submarine/client/cli/CliConstants.java |  48 ++
 .../yarn/submarine/client/cli/CliUtils.java     | 174 +++++++
 .../yarn/submarine/client/cli/RunJobCli.java    | 204 +++++++++
 .../yarn/submarine/client/cli/ShowJobCli.java   | 125 +++++
 .../client/cli/param/BaseParameters.java        |  56 +++
 .../client/cli/param/RunJobParameters.java      | 222 +++++++++
 .../client/cli/param/RunParameters.java         | 103 +++++
 .../client/cli/param/ShowJobParameters.java     |  18 +
 .../yarn/submarine/common/ClientContext.java    |  77 ++++
 .../hadoop/yarn/submarine/common/Envs.java      |  27 ++
 .../common/api/JobComponentStatus.java          |  73 +++
 .../yarn/submarine/common/api/JobState.java     |  52 +++
 .../yarn/submarine/common/api/JobStatus.java    |  87 ++++
 .../yarn/submarine/common/api/TaskType.java     |  32 ++
 .../api/builder/JobComponentStatusBuilder.java  |  44 ++
 .../common/api/builder/JobStatusBuilder.java    |  64 +++
 .../common/conf/SubmarineConfiguration.java     |  51 +++
 .../submarine/common/conf/SubmarineLogs.java    |  31 ++
 .../common/exception/SubmarineException.java    |  21 +
 .../exception/SubmarineRuntimeException.java    |  25 +
 .../fs/DefaultRemoteDirectoryManager.java       |  84 ++++
 .../common/fs/RemoteDirectoryManager.java       |  30 ++
 .../yarn/submarine/runtimes/RuntimeFactory.java | 106 +++++
 .../common/FSBasedSubmarineStorageImpl.java     | 106 +++++
 .../submarine/runtimes/common/JobMonitor.java   |  84 ++++
 .../submarine/runtimes/common/JobSubmitter.java |  36 ++
 .../runtimes/common/StorageKeyConstants.java    |  24 +
 .../runtimes/common/SubmarineStorage.java       |  57 +++
 .../yarnservice/YarnServiceJobMonitor.java      |  46 ++
 .../yarnservice/YarnServiceJobSubmitter.java    | 458 +++++++++++++++++++
 .../yarnservice/YarnServiceRuntimeFactory.java  |  44 ++
 .../runtimes/yarnservice/YarnServiceUtils.java  |  78 ++++
 .../src/site/DeveloperGuide.md                  |  26 ++
 .../src/site/QuickStart.md                      | 134 ++++++
 .../client/cli/TestRunJobCliParsing.java        | 229 ++++++++++
 .../client/cli/TestShowJobCliParsing.java       | 104 +++++
 .../yarnservice/TestYarnServiceRunJobCli.java   | 167 +++++++
 .../yarnservice/YarnServiceCliTestUtils.java    |  35 ++
 .../submarine/common/MockClientContext.java     |  56 +++
 .../common/fs/MockRemoteDirectoryManager.java   |  83 ++++
 .../runtimes/common/MemorySubmarineStorage.java |  74 +++
 .../common/TestFSBasedSubmarineStorage.java     |  73 +++
 .../yarnservice/TestTFConfigGenerator.java      |  42 ++
 .../src/test/resources/core-site.xml            |  21 +
 .../src/test/resources/hdfs-site.xml            |  21 +
 .../hadoop-yarn-applications/pom.xml            |   1 +
 50 files changed, 4172 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/README.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/README.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/README.md
new file mode 100644
index 0000000..3e04730
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/README.md
@@ -0,0 +1,53 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Overview
+
+```$xslt
+              _                              _
+             | |                            (_)
+  ___  _   _ | |__   _ __ ___    __ _  _ __  _  _ __    ___
+ / __|| | | || '_ \ | '_ ` _ \  / _` || '__|| || '_ \  / _ \
+ \__ \| |_| || |_) || | | | | || (_| || |   | || | | ||  __/
+ |___/ \__,_||_.__/ |_| |_| |_| \__,_||_|   |_||_| |_| \___|
+
+                             ?
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~|^"~~~~~~~~~~~~~~~~~~~~~~~~~o~~~~~~~~~~~
+        o                   |                  o      __o
+         o                  |                 o     |X__>
+       ___o                 |                __o
+     (X___>--             __|__            |X__>     o
+                         |     \                   __o
+                         |      \                |X__>
+  _______________________|_______\________________
+ <                                                \____________   _
+  \                                                            \ (_)
+   \    O       O       O                                       >=)
+    \__________________________________________________________/ (_)
+```
+
+Submarine is a project which allows infra engineer / data scientist to run *unmodified* Tensorflow programs on YARN.
+
+Goals of Submarine:
+- It allows jobs easy access data/models in HDFS and other storages.
+- Can launch services to serve Tensorflow/MXNet models.
+- Support run distributed Tensorflow jobs with simple configs.
+- Support run user-specified Docker images.
+- Support specify GPU and other resources.
+- Support launch tensorboard for training jobs if user specified.
+- Support customized DNS name for roles (like tensorboard.$user.$domain:6006)
+
+Please jump to [QuickStart](src/site/QuickStart.md) guide to quickly understand how to use this framework.
+
+If you're a developer, please find [Developer](src/site/DeveloperGuide.md) guide for more details.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/pom.xml
new file mode 100644
index 0000000..90a1a6c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/pom.xml
@@ -0,0 +1,213 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hadoop-yarn-applications</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>3.2.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hadoop-yarn-submarine</artifactId>
+  <version>3.2.0-SNAPSHOT</version>
+  <name>Yet Another Learning Platform</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-fs2img</artifactId>
+      <version>3.2.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-services-api</artifactId>
+      <version>3.2.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-services-core</artifactId>
+      <version>3.2.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <!-- strictly speaking, the unit test is really a regression test. It
+                 needs the main jar to be available to be able to run. -->
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+        <configuration>
+           <archive>
+             <manifest>
+               <mainClass>org.apache.hadoop.yarn.submarine.client.cli.Cli</mainClass>
+             </manifest>
+           </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <JAVA_HOME>${java.home}</JAVA_HOME>
+          </environmentVariables>
+       </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/AbstractCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/AbstractCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/AbstractCli.java
new file mode 100644
index 0000000..f6a9214
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/AbstractCli.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+
+import java.io.IOException;
+
+public abstract class AbstractCli implements Tool {
+  protected ClientContext clientContext;
+
+  public AbstractCli(ClientContext cliContext) {
+    this.clientContext = cliContext;
+  }
+
+  @Override
+  public abstract int run(String[] args)
+      throws ParseException, IOException, YarnException, InterruptedException,
+      SubmarineException;
+
+  @Override
+  public void setConf(Configuration conf) {
+    clientContext.setSubmarineConfig(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return clientContext.getSubmarineConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
new file mode 100644
index 0000000..b4c5e4c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class Cli {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(Cli.class);
+
+  private static void printHelp() {
+    StringBuilder helpMsg = new StringBuilder();
+    helpMsg.append("\n\nUsage: <object> [<action>] [<args>]\n");
+    helpMsg.append("  Below are all objects / actions:\n");
+    helpMsg.append("    job \n");
+    helpMsg.append("       run : run a job, please see 'job run --help' for usage \n");
+    helpMsg.append("       show : get status of job, please see 'job show --help' for usage \n");
+
+    System.out.println(helpMsg.toString());
+  }
+
+  private static ClientContext getClientContext() {
+    Configuration conf = new YarnConfiguration();
+    ClientContext clientContext = new ClientContext();
+    clientContext.setConfiguration(conf);
+    clientContext.setRemoteDirectoryManager(
+        new DefaultRemoteDirectoryManager(clientContext));
+    RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory(
+        clientContext);
+    clientContext.setRuntimeFactory(runtimeFactory);
+    return clientContext;
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.out.println("              _                              _              \n"
+        + "             | |                            (_)             \n"
+        + "  ___  _   _ | |__   _ __ ___    __ _  _ __  _  _ __    ___ \n"
+        + " / __|| | | || '_ \\ | '_ ` _ \\  / _` || '__|| || '_ \\  / _ \\\n"
+        + " \\__ \\| |_| || |_) || | | | | || (_| || |   | || | | ||  __/\n"
+        + " |___/ \\__,_||_.__/ |_| |_| |_| \\__,_||_|   |_||_| |_| \\___|\n"
+        + "                                                    \n"
+        + "                             ?\n"
+        + " ~~~~~~~~~~~~~~~~~~~~~~~~~~~|^\"~~~~~~~~~~~~~~~~~~~~~~~~~o~~~~~~~~~~~\n"
+        + "        o                   |                  o      __o\n"
+        + "         o                  |                 o     |X__>\n"
+        + "       ___o                 |                __o\n"
+        + "     (X___>--             __|__            |X__>     o\n"
+        + "                         |     \\                   __o\n"
+        + "                         |      \\                |X__>\n"
+        + "  _______________________|_______\\________________\n"
+        + " <                                                \\____________   _\n"
+        + "  \\                                                            \\ (_)\n"
+        + "   \\    O       O       O                                       >=)\n"
+        + "    \\__________________________________________________________/ (_)\n"
+        + "\n");
+
+    if (CliUtils.argsForHelp(args)) {
+      printHelp();
+      System.exit(0);
+    }
+
+    if (args.length < 2) {
+      LOG.error("Bad parameters specified.");
+      printHelp();
+      System.exit(-1);
+    }
+
+    String[] moduleArgs = Arrays.copyOfRange(args, 2, args.length);
+    ClientContext clientContext = getClientContext();
+
+    if (args[0].equals("job")) {
+      String subCmd = args[1];
+      if (subCmd.equals(CliConstants.RUN)) {
+        new RunJobCli(clientContext).run(moduleArgs);
+      } else if (subCmd.equals(CliConstants.SHOW)) {
+        new ShowJobCli(clientContext).run(moduleArgs);
+      } else {
+        printHelp();
+        throw new IllegalArgumentException("Unknown option for job");
+      }
+    } else {
+      printHelp();
+      throw new IllegalArgumentException("Bad parameters <TODO>");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
new file mode 100644
index 0000000..d0958a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+/*
+ * NOTE: use lowercase + "_" for the option name
+ */
+public class CliConstants {
+  public static final String RUN = "run";
+  public static final String SERVE = "serve";
+  public static final String LIST = "list";
+  public static final String SHOW = "show";
+  public static final String NAME = "name";
+  public static final String INPUT_PATH = "input_path";
+  public static final String CHECKPOINT_PATH = "checkpoint_path";
+  public static final String SAVED_MODEL_PATH = "saved_model_path";
+  public static final String N_WORKERS = "num_workers";
+  public static final String N_SERVING_TASKS = "num_serving_tasks";
+  public static final String N_PS = "num_ps";
+  public static final String WORKER_RES = "worker_resources";
+  public static final String SERVING_RES = "serving_resources";
+  public static final String PS_RES = "ps_resources";
+  public static final String DOCKER_IMAGE = "docker_image";
+  public static final String QUEUE = "queue";
+  public static final String TENSORBOARD = "tensorboard";
+  public static final String WORKER_LAUNCH_CMD = "worker_launch_cmd";
+  public static final String SERVING_LAUNCH_CMD = "serving_launch_cmd";
+  public static final String PS_LAUNCH_CMD = "ps_launch_cmd";
+  public static final String ENV = "env";
+  public static final String VERBOSE = "verbose";
+  public static final String SERVING_FRAMEWORK = "serving_framework";
+  public static final String STOP = "stop";
+  public static final String WAIT_JOB_FINISH = "wait_job_finish";
+  public static final String PS_DOCKER_IMAGE = "ps_docker_image";
+  public static final String WORKER_DOCKER_IMAGE = "worker_docker_image";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
new file mode 100644
index 0000000..6dd3e4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CliUtils {
+  private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
+  /**
+   * Replace patterns inside cli
+   *
+   * @return launch command after pattern replace
+   */
+  public static String replacePatternsInLaunchCommand(String specifiedCli,
+      RunJobParameters jobRunParameters,
+      RemoteDirectoryManager directoryManager) throws IOException {
+    String jobDir = jobRunParameters.getCheckpointPath();
+    if (null == jobDir) {
+      jobDir = directoryManager.getJobCheckpointDir(jobRunParameters.getName(),
+          true).toString();
+    }
+
+    String input = jobRunParameters.getInputPath();
+    String savedModelDir = jobRunParameters.getSavedModelPath();
+    if (null == savedModelDir) {
+      savedModelDir = jobDir;
+    }
+
+    Map<String, String> replacePattern = new HashMap<>();
+    if (jobDir != null) {
+      replacePattern.put("%" + CliConstants.CHECKPOINT_PATH + "%", jobDir);
+    }
+    if (input != null) {
+      replacePattern.put("%" + CliConstants.INPUT_PATH + "%", input);
+    }
+    if (savedModelDir != null) {
+      replacePattern.put("%" + CliConstants.SAVED_MODEL_PATH + "%",
+          savedModelDir);
+    }
+
+    String newCli = specifiedCli;
+    for (Map.Entry<String, String> replace : replacePattern.entrySet()) {
+      newCli = newCli.replace(replace.getKey(), replace.getValue());
+    }
+
+    return newCli;
+  }
+
+  // TODO, this duplicated to Client of distributed shell, should cleanup
+  private static Map<String, Long> parseResourcesString(String resourcesStr) {
+    Map<String, Long> resources = new HashMap<>();
+
+    // Ignore the grouping "[]"
+    if (resourcesStr.startsWith("[")) {
+      resourcesStr = resourcesStr.substring(1);
+    }
+    if (resourcesStr.endsWith("]")) {
+      resourcesStr = resourcesStr.substring(0, resourcesStr.length());
+    }
+
+    for (String resource : resourcesStr.trim().split(",")) {
+      resource = resource.trim();
+      if (!resource.matches(RES_PATTERN)) {
+        throw new IllegalArgumentException("\"" + resource + "\" is not a "
+            + "valid resource type/amount pair. "
+            + "Please provide key=amount pairs separated by commas.");
+      }
+      String[] splits = resource.split("=");
+      String key = splits[0], value = splits[1];
+      String units = ResourceUtils.getUnits(value);
+
+      String valueWithoutUnit = value.substring(0, value.length() - units.length()).trim();
+      Long resourceValue = Long.valueOf(valueWithoutUnit);
+
+      // Convert commandline unit to standard YARN unit.
+      if (units.equals("M") || units.equals("m")) {
+        units = "Mi";
+      } else if (units.equals("G") || units.equals("g")) {
+        units = "Gi";
+      } else if (units.isEmpty()) {
+        // do nothing;
+      } else{
+        throw new IllegalArgumentException("Acceptable units are M/G or empty");
+      }
+
+      // special handle memory-mb and memory
+      if (key.equals(ResourceInformation.MEMORY_URI)) {
+        if (!units.isEmpty()) {
+          resourceValue = UnitsConversionUtil.convert(units, "Mi",
+              resourceValue);
+        }
+      }
+
+      if (key.equals("memory")) {
+        key = ResourceInformation.MEMORY_URI;
+        resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+      }
+
+      // special handle gpu
+      if (key.equals("gpu")) {
+        key = ResourceInformation.GPU_URI;
+      }
+
+      // special handle fpga
+      if (key.equals("fpga")) {
+        key = ResourceInformation.FPGA_URI;
+      }
+
+      resources.put(key, resourceValue);
+    }
+    return resources;
+  }
+
+  private static void validateResourceTypes(Iterable<String> resourceNames,
+      List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
+    for (String resourceName : resourceNames) {
+      if (!resourceTypes.stream().anyMatch(
+          e -> e.getName().equals(resourceName))) {
+        throw new ResourceNotFoundException(
+            "Unknown resource: " + resourceName);
+      }
+    }
+  }
+
+  public static Resource createResourceFromString(String resourceStr,
+      List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
+    Map<String, Long> typeToValue = parseResourcesString(resourceStr);
+    validateResourceTypes(typeToValue.keySet(), resourceTypes);
+    Resource resource = Resource.newInstance(0, 0);
+    for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
+      resource.setResourceValue(entry.getKey(), entry.getValue());
+    }
+    return resource;
+  }
+
+  // Is it for help?
+  public static boolean argsForHelp(String[] args) {
+    if (args == null || args.length == 0)
+      return true;
+
+    if (args.length == 1) {
+      if (args[0].equals("-h") || args[0].equals("--help")) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
new file mode 100644
index 0000000..d7dfc0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RunJobCli extends AbstractCli {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RunJobCli.class);
+
+  private Options options;
+  private RunJobParameters parameters = new RunJobParameters();
+
+  private JobSubmitter jobSubmitter;
+  private JobMonitor jobMonitor;
+
+  public RunJobCli(ClientContext cliContext) {
+    this(cliContext, cliContext.getRuntimeFactory().getJobSubmitterInstance(),
+        cliContext.getRuntimeFactory().getJobMonitorInstance());
+  }
+
+  @VisibleForTesting
+  public RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
+      JobMonitor jobMonitor) {
+    super(cliContext);
+    options = generateOptions();
+    this.jobSubmitter = jobSubmitter;
+    this.jobMonitor = jobMonitor;
+  }
+
+  public void printUsages() {
+    new HelpFormatter().printHelp("job run", options);
+  }
+
+  private Options generateOptions() {
+    Options options = new Options();
+    options.addOption(CliConstants.NAME, true, "Name of the job");
+    options.addOption(CliConstants.INPUT_PATH, true,
+        "Input of the job, could be local or other FS directory");
+    options.addOption(CliConstants.CHECKPOINT_PATH, true,
+        "Training output directory of the job, "
+            + "could be local or other FS directory. This typically includes "
+            + "checkpoint files and exported model ");
+    options.addOption(CliConstants.SAVED_MODEL_PATH, true,
+        "Model exported path (savedmodel) of the job, which is needed when "
+            + "exported model is not placed under ${checkpoint_path}"
+            + "could be local or other FS directory. This will be used to serve.");
+    options.addOption(CliConstants.N_WORKERS, true,
+        "Numnber of worker tasks of the job, by default it's 1");
+    options.addOption(CliConstants.N_PS, true,
+        "Number of PS tasks of the job, by default it's 0");
+    options.addOption(CliConstants.WORKER_RES, true,
+        "Resource of each worker, for example "
+            + "memory-mb=2048,vcores=2,yarn.io/gpu=2");
+    options.addOption(CliConstants.PS_RES, true,
+        "Resource of each PS, for example "
+            + "memory-mb=2048,vcores=2,yarn.io/gpu=2");
+    options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
+    options.addOption(CliConstants.QUEUE, true,
+        "Name of queue to run the job, by default it uses default queue");
+    options.addOption(CliConstants.TENSORBOARD, true,
+        "Should we run TensorBoard" + " for this job? By default it's true");
+    options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
+        "Commandline of worker, arguments will be "
+            + "directly used to launch the worker");
+    options.addOption(CliConstants.PS_LAUNCH_CMD, true,
+        "Commandline of worker, arguments will be "
+            + "directly used to launch the PS");
+    options.addOption(CliConstants.ENV, true,
+        "Common environment variable of worker/ps");
+    options.addOption(CliConstants.VERBOSE, false,
+        "Print verbose log for troubleshooting");
+    options.addOption(CliConstants.WAIT_JOB_FINISH, false,
+        "Specified when user want to wait the job finish");
+    options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
+        "Specify docker image for PS, when this is not specified, PS uses --"
+            + CliConstants.DOCKER_IMAGE + " as default.");
+    options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
+        "Specify docker image for WORKER, when this is not specified, WORKER "
+            + "uses --" + CliConstants.DOCKER_IMAGE + " as default.");
+    options.addOption("h", "help", false, "Print help");
+    return options;
+  }
+
+  private void replacePatternsInParameters() throws IOException {
+    if (parameters.getPSLaunchCmd() != null && !parameters.getPSLaunchCmd()
+        .isEmpty()) {
+      String afterReplace = CliUtils.replacePatternsInLaunchCommand(
+          parameters.getPSLaunchCmd(), parameters,
+          clientContext.getRemoteDirectoryManager());
+      parameters.setPSLaunchCmd(afterReplace);
+    }
+
+    if (parameters.getWorkerLaunchCmd() != null && !parameters
+        .getWorkerLaunchCmd().isEmpty()) {
+      String afterReplace = CliUtils.replacePatternsInLaunchCommand(
+          parameters.getWorkerLaunchCmd(), parameters,
+          clientContext.getRemoteDirectoryManager());
+      parameters.setWorkerLaunchCmd(afterReplace);
+    }
+  }
+
+  private void parseCommandLineAndGetRunJobParameters(String[] args)
+      throws ParseException, IOException, YarnException {
+    try {
+      // Do parsing
+      GnuParser parser = new GnuParser();
+      CommandLine cli = parser.parse(options, args);
+      parameters.updateParametersByParsedCommandline(cli, options, clientContext);
+    } catch (ParseException e) {
+      LOG.error("Exception in parse:", e.getMessage());
+      printUsages();
+      throw e;
+    }
+
+    // replace patterns
+    replacePatternsInParameters();
+  }
+
+  private void storeJobInformation(String jobName, ApplicationId applicationId,
+      String[] args) throws IOException {
+    Map<String, String> jobInfo = new HashMap<>();
+    jobInfo.put(StorageKeyConstants.JOB_NAME, jobName);
+    jobInfo.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
+
+    if (parameters.getCheckpointPath() != null) {
+      jobInfo.put(StorageKeyConstants.CHECKPOINT_PATH,
+          parameters.getCheckpointPath());
+    }
+    if (parameters.getInputPath() != null) {
+      jobInfo.put(StorageKeyConstants.INPUT_PATH,
+          parameters.getInputPath());
+    }
+    if (parameters.getSavedModelPath() != null) {
+      jobInfo.put(StorageKeyConstants.SAVED_MODEL_PATH,
+          parameters.getSavedModelPath());
+    }
+
+    String joinedArgs = String.join(" ", args);
+    jobInfo.put(StorageKeyConstants.JOB_RUN_ARGS, joinedArgs);
+    clientContext.getRuntimeFactory().getSubmarineStorage().addNewJob(jobName,
+        jobInfo);
+  }
+
+  @Override
+  public int run(String[] args)
+      throws ParseException, IOException, YarnException, InterruptedException,
+      SubmarineException {
+    if (CliUtils.argsForHelp(args)) {
+      printUsages();
+      return 0;
+    }
+
+    parseCommandLineAndGetRunJobParameters(args);
+    ApplicationId applicationId = this.jobSubmitter.submitJob(parameters);
+    storeJobInformation(parameters.getName(), applicationId, args);
+    if (parameters.isWaitJobFinish()) {
+      this.jobMonitor.waitTrainingFinal(parameters.getName());
+    }
+
+    return 0;
+  }
+
+  @VisibleForTesting
+  public JobSubmitter getJobSubmitter() {
+    return jobSubmitter;
+  }
+
+  @VisibleForTesting
+  RunJobParameters getRunJobParameters() {
+    return parameters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/ShowJobCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/ShowJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/ShowJobCli.java
new file mode 100644
index 0000000..6b76192
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/ShowJobCli.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.ShowJobParameters;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ShowJobCli extends AbstractCli {
+  private static final Logger LOG = LoggerFactory.getLogger(ShowJobCli.class);
+
+  private Options options;
+  private ShowJobParameters parameters = new ShowJobParameters();
+
+  public ShowJobCli(ClientContext cliContext) {
+    super(cliContext);
+    options = generateOptions();
+  }
+
+  public void printUsages() {
+    new HelpFormatter().printHelp("job show", options);
+  }
+
+  private Options generateOptions() {
+    Options options = new Options();
+    options.addOption(CliConstants.NAME, true, "Name of the job");
+    options.addOption("h", "help", false, "Print help");
+    return options;
+  }
+
+  private void parseCommandLineAndGetShowJobParameters(String[] args)
+      throws IOException, YarnException {
+    // Do parsing
+    GnuParser parser = new GnuParser();
+    CommandLine cli;
+    try {
+      cli = parser.parse(options, args);
+      parameters.updateParametersByParsedCommandline(cli, options,
+          clientContext);
+    } catch (ParseException e) {
+      printUsages();
+    }
+  }
+
+  private void printIfNotNull(String keyForPrint, String keyInStorage,
+      Map<String, String> jobInfo) {
+    if (jobInfo.containsKey(keyInStorage)) {
+      System.out.println("\t" + keyForPrint + ": " + jobInfo.get(keyInStorage));
+    }
+  }
+
+  private void printJobInfo(Map<String, String> jobInfo) {
+    System.out.println("Job Meta Info:");
+    printIfNotNull("Application Id", StorageKeyConstants.APPLICATION_ID,
+        jobInfo);
+    printIfNotNull("Input Path", StorageKeyConstants.INPUT_PATH, jobInfo);
+    printIfNotNull("Saved Model Path", StorageKeyConstants.SAVED_MODEL_PATH,
+        jobInfo);
+    printIfNotNull("Checkpoint Path", StorageKeyConstants.CHECKPOINT_PATH,
+        jobInfo);
+    printIfNotNull("Run Parameters", StorageKeyConstants.JOB_RUN_ARGS,
+        jobInfo);
+  }
+
+  @VisibleForTesting
+  protected void getAndPrintJobInfo() throws IOException {
+    SubmarineStorage storage =
+        clientContext.getRuntimeFactory().getSubmarineStorage();
+
+    Map<String, String> jobInfo = null;
+    try {
+      jobInfo = storage.getJobInfoByName(parameters.getName());
+    } catch (IOException e) {
+      LOG.error("Failed to retrieve job info", e);
+      throw e;
+    }
+
+    printJobInfo(jobInfo);
+  }
+
+  @VisibleForTesting
+  public ShowJobParameters getParameters() {
+    return parameters;
+  }
+
+  @Override
+  public int run(String[] args)
+      throws ParseException, IOException, YarnException, InterruptedException,
+      SubmarineException {
+    if (CliUtils.argsForHelp(args)) {
+      printUsages();
+      return 0;
+    }
+
+    parseCommandLineAndGetShowJobParameters(args);
+    getAndPrintJobInfo();
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/BaseParameters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/BaseParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/BaseParameters.java
new file mode 100644
index 0000000..609e868
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/BaseParameters.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+
+import java.io.IOException;
+
+/**
+ * Base class of all parameters.
+ */
+public abstract class BaseParameters {
+  private String name;
+
+  public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
+      Options options, ClientContext clientContext)
+      throws ParseException, IOException, YarnException {
+    String name = parsedCommandLine.getOptionValue(CliConstants.NAME);
+    if (name == null) {
+      throw new ParseException("--name is absent");
+    }
+
+    if (parsedCommandLine.hasOption(CliConstants.VERBOSE)) {
+      SubmarineLogs.verboseOn();
+    }
+
+    this.setName(name);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public BaseParameters setName(String name) {
+    this.name = name;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
new file mode 100644
index 0000000..6cab9e3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
+import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+
+import java.io.IOException;
+
+/**
+ * Parameters used to run a job
+ */
+public class RunJobParameters extends RunParameters {
+  private String input;
+  private String checkpointPath;
+
+  private int numWorkers;
+  private int numPS;
+  private Resource workerResource;
+  private Resource psResource;
+  private boolean tensorboardEnabled;
+  private String workerLaunchCmd;
+  private String psLaunchCmd;
+
+  private String psDockerImage = null;
+  private String workerDockerImage = null;
+
+  private boolean waitJobFinish = false;
+  private boolean distributed = false;
+
+  @Override
+  public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
+      Options options, ClientContext clientContext)
+      throws ParseException, IOException, YarnException {
+
+    String input = parsedCommandLine.getOptionValue(CliConstants.INPUT_PATH);
+    String jobDir = parsedCommandLine.getOptionValue(CliConstants.CHECKPOINT_PATH);
+    int nWorkers = 1;
+    if (parsedCommandLine.getOptionValue(CliConstants.N_WORKERS) != null) {
+      nWorkers = Integer.parseInt(
+          parsedCommandLine.getOptionValue(CliConstants.N_WORKERS));
+    }
+
+    int nPS = 0;
+    if (parsedCommandLine.getOptionValue(CliConstants.N_PS) != null) {
+      nPS = Integer.parseInt(
+          parsedCommandLine.getOptionValue(CliConstants.N_PS));
+    }
+
+    // Check #workers and #ps.
+    // When distributed training is required
+    if (nWorkers >= 2 && nPS > 0) {
+      distributed = true;
+    } else if (nWorkers == 1 && nPS > 0) {
+      throw new ParseException("Only specified one worker but non-zero PS, "
+          + "please double check.");
+    }
+
+    String workerResourceStr = parsedCommandLine.getOptionValue(
+        CliConstants.WORKER_RES);
+    if (workerResourceStr == null) {
+      throw new ParseException("--" + CliConstants.WORKER_RES + " is absent.");
+    }
+    Resource workerResource = CliUtils.createResourceFromString(
+        workerResourceStr,
+        clientContext.getOrCreateYarnClient().getResourceTypeInfo());
+
+    Resource psResource = null;
+    if (nPS > 0) {
+      String psResourceStr = parsedCommandLine.getOptionValue(CliConstants.PS_RES);
+      if (psResourceStr == null) {
+        throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
+      }
+      psResource = CliUtils.createResourceFromString(psResourceStr,
+          clientContext.getOrCreateYarnClient().getResourceTypeInfo());
+    }
+
+    boolean tensorboard = false;
+    if (parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD) != null) {
+      tensorboard = Boolean.parseBoolean(
+          parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD));
+    }
+
+    if (parsedCommandLine.hasOption(CliConstants.WAIT_JOB_FINISH)) {
+      this.waitJobFinish = true;
+    }
+
+    psDockerImage = parsedCommandLine.getOptionValue(
+        CliConstants.PS_DOCKER_IMAGE);
+    workerDockerImage = parsedCommandLine.getOptionValue(
+        CliConstants.WORKER_DOCKER_IMAGE);
+
+    String workerLaunchCmd = parsedCommandLine.getOptionValue(
+        CliConstants.WORKER_LAUNCH_CMD);
+    String psLaunchCommand = parsedCommandLine.getOptionValue(
+        CliConstants.PS_LAUNCH_CMD);
+
+    this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers)
+        .setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd)
+        .setPsResource(psResource).setWorkerResource(workerResource)
+        .setTensorboardEnabled(tensorboard);
+
+    super.updateParametersByParsedCommandline(parsedCommandLine,
+        options, clientContext);
+  }
+
+  public String getInputPath() {
+    return input;
+  }
+
+  public RunJobParameters setInputPath(String input) {
+    this.input = input;
+    return this;
+  }
+
+  public String getCheckpointPath() {
+    return checkpointPath;
+  }
+
+  public RunJobParameters setCheckpointPath(String checkpointPath) {
+    this.checkpointPath = checkpointPath;
+    return this;
+  }
+
+  public int getNumWorkers() {
+    return numWorkers;
+  }
+
+  public RunJobParameters setNumWorkers(int numWorkers) {
+    this.numWorkers = numWorkers;
+    return this;
+  }
+
+  public int getNumPS() {
+    return numPS;
+  }
+
+  public RunJobParameters setNumPS(int numPS) {
+    this.numPS = numPS;
+    return this;
+  }
+
+  public Resource getWorkerResource() {
+    return workerResource;
+  }
+
+  public RunJobParameters setWorkerResource(Resource workerResource) {
+    this.workerResource = workerResource;
+    return this;
+  }
+
+  public Resource getPsResource() {
+    return psResource;
+  }
+
+  public RunJobParameters setPsResource(Resource psResource) {
+    this.psResource = psResource;
+    return this;
+  }
+
+  public boolean isTensorboardEnabled() {
+    return tensorboardEnabled;
+  }
+
+  public RunJobParameters setTensorboardEnabled(boolean tensorboardEnabled) {
+    this.tensorboardEnabled = tensorboardEnabled;
+    return this;
+  }
+
+  public String getWorkerLaunchCmd() {
+    return workerLaunchCmd;
+  }
+
+  public RunJobParameters setWorkerLaunchCmd(String workerLaunchCmd) {
+    this.workerLaunchCmd = workerLaunchCmd;
+    return this;
+  }
+
+  public String getPSLaunchCmd() {
+    return psLaunchCmd;
+  }
+
+  public RunJobParameters setPSLaunchCmd(String psLaunchCmd) {
+    this.psLaunchCmd = psLaunchCmd;
+    return this;
+  }
+
+  public boolean isWaitJobFinish() {
+    return waitJobFinish;
+  }
+
+
+  public String getPsDockerImage() {
+    return psDockerImage;
+  }
+
+  public String getWorkerDockerImage() {
+    return workerDockerImage;
+  }
+
+  public boolean isDistributed() {
+    return distributed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunParameters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunParameters.java
new file mode 100644
index 0000000..28884d8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunParameters.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parameters required to run anything on cluster. Such as run job / serve model
+ */
+public abstract class RunParameters extends BaseParameters {
+  private String savedModelPath;
+  private String dockerImageName;
+  private List<String> envars = new ArrayList<>();
+  private String queue;
+
+  @Override
+  public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
+      Options options, ClientContext clientContext) throws ParseException,
+      IOException, YarnException {
+    String savedModelPath = parsedCommandLine.getOptionValue(
+        CliConstants.SAVED_MODEL_PATH);
+    this.setSavedModelPath(savedModelPath);
+
+    // Envars
+    List<String> envarsList = new ArrayList<>();
+    String[] envars = parsedCommandLine.getOptionValues(CliConstants.ENV);
+    if (envars != null) {
+      for (String envar : envars) {
+        envarsList.add(envar);
+      }
+    }
+    this.setEnvars(envarsList);
+
+    String queue = parsedCommandLine.getOptionValue(
+        CliConstants.QUEUE);
+    this.setQueue(queue);
+
+    String dockerImage = parsedCommandLine.getOptionValue(
+        CliConstants.DOCKER_IMAGE);
+    this.setDockerImageName(dockerImage);
+
+    super.updateParametersByParsedCommandline(parsedCommandLine,
+        options, clientContext);
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public RunParameters setQueue(String queue) {
+    this.queue = queue;
+    return this;
+  }
+
+  public String getDockerImageName() {
+    return dockerImageName;
+  }
+
+  public RunParameters setDockerImageName(String dockerImageName) {
+    this.dockerImageName = dockerImageName;
+    return this;
+  }
+
+
+  public List<String> getEnvars() {
+    return envars;
+  }
+
+  public RunParameters setEnvars(List<String> envars) {
+    this.envars = envars;
+    return this;
+  }
+
+  public String getSavedModelPath() {
+    return savedModelPath;
+  }
+
+  public RunParameters setSavedModelPath(String savedModelPath) {
+    this.savedModelPath = savedModelPath;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ShowJobParameters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ShowJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ShowJobParameters.java
new file mode 100644
index 0000000..e5f19d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ShowJobParameters.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+public class ShowJobParameters extends BaseParameters {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
new file mode 100644
index 0000000..31a8b1b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+
+public class ClientContext {
+  private Configuration yarnConf = new YarnConfiguration();
+
+  private RemoteDirectoryManager remoteDirectoryManager;
+  private YarnClient yarnClient;
+  private Configuration submarineConfig;
+  private RuntimeFactory runtimeFactory;
+
+  public ClientContext() {
+    submarineConfig = new SubmarineConfiguration();
+  }
+
+  public synchronized YarnClient getOrCreateYarnClient() {
+    if (yarnClient == null) {
+      yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(yarnConf);
+      yarnClient.start();
+    }
+    return yarnClient;
+  }
+
+  public Configuration getYarnConfig() {
+    return yarnConf;
+  }
+
+  public void setConfiguration(Configuration conf) {
+    this.yarnConf = conf;
+  }
+
+  public RemoteDirectoryManager getRemoteDirectoryManager() {
+    return remoteDirectoryManager;
+  }
+
+  public void setRemoteDirectoryManager(
+      RemoteDirectoryManager remoteDirectoryManager) {
+    this.remoteDirectoryManager = remoteDirectoryManager;
+  }
+
+  public Configuration getSubmarineConfig() {
+    return submarineConfig;
+  }
+
+  public void setSubmarineConfig(Configuration submarineConfig) {
+    this.submarineConfig = submarineConfig;
+  }
+
+  public RuntimeFactory getRuntimeFactory() {
+    return runtimeFactory;
+  }
+
+  public void setRuntimeFactory(RuntimeFactory runtimeFactory) {
+    this.runtimeFactory = runtimeFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/Envs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/Envs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/Envs.java
new file mode 100644
index 0000000..a1d80db
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/Envs.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common;
+
+public class Envs {
+  public static final String TASK_TYPE_ENV = "_TASK_TYPE";
+  public static final String TASK_INDEX_ENV = "_TASK_INDEX";
+
+  /*
+   * HDFS/HADOOP-related configs
+   */
+  public static final String HADOOP_HDFS_HOME = "HADOOP_HDFS_HOME";
+  public static final String JAVA_HOME = "JAVA_HOME";
+  public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobComponentStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobComponentStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobComponentStatus.java
new file mode 100644
index 0000000..22468c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobComponentStatus.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api;
+
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+
+/**
+ * Status of component of training job
+ */
+public class JobComponentStatus {
+  private String compName;
+  private long numReadyContainers = 0;
+  private long numRunningButUnreadyContainers = 0;
+  private long totalAskedContainers;
+
+  public JobComponentStatus(String compName, long nReadyContainers,
+      long nRunningButUnreadyContainers, long totalAskedContainers) {
+    this.compName = compName;
+    this.numReadyContainers = nReadyContainers;
+    this.numRunningButUnreadyContainers = nRunningButUnreadyContainers;
+    this.totalAskedContainers = totalAskedContainers;
+  }
+
+  public String getCompName() {
+    return compName;
+  }
+
+  public void setCompName(String compName) {
+    this.compName = compName;
+  }
+
+  public long getNumReadyContainers() {
+    return numReadyContainers;
+  }
+
+  public void setNumReadyContainers(long numReadyContainers) {
+    this.numReadyContainers = numReadyContainers;
+  }
+
+  public long getNumRunningButUnreadyContainers() {
+    return numRunningButUnreadyContainers;
+  }
+
+  public void setNumRunningButUnreadyContainers(
+      long numRunningButUnreadyContainers) {
+    this.numRunningButUnreadyContainers = numRunningButUnreadyContainers;
+  }
+
+  public long getTotalAskedContainers() {
+    return totalAskedContainers;
+  }
+
+  public void setTotalAskedContainers(long totalAskedContainers) {
+    this.totalAskedContainers = totalAskedContainers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobState.java
new file mode 100644
index 0000000..eef273a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api;
+
+/**
+ * State of training job
+ */
+public enum JobState {
+  /**
+   * Job accepted by scheduler and start running
+   */
+  RUNNING,
+
+  /**
+   * Job killed by user
+   */
+  KILLED,
+
+  /**
+   * Job failed
+   */
+  FAILED,
+
+  /**
+   * Job succeeded
+   */
+  SUCCEEDED,
+
+  /**
+   * Job paused by user
+   */
+  PAUSED;
+
+  public static boolean isFinal(JobState state) {
+    return state == KILLED || state == SUCCEEDED || state == FAILED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobStatus.java
new file mode 100644
index 0000000..6e390f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/JobStatus.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api;
+
+import java.io.PrintStream;
+import java.time.Instant;
+import java.util.List;
+
+/**
+ * Status of training job.
+ */
+public class JobStatus {
+
+  protected String jobName;
+  protected JobState state;
+  protected String tensorboardLink = "N/A";
+  protected List<JobComponentStatus> componentStatus;
+
+  public void nicePrint(PrintStream out) {
+    out.println(
+        "Job Name=" + this.jobName + ", status=" + state.name() + " time="
+            + Instant.now());
+    if (JobState.isFinal(this.state)) {
+      return;
+    }
+
+    if (tensorboardLink.startsWith("http")) {
+      out.println("  Tensorboard link: " + tensorboardLink);
+    }
+
+    out.println("  Components:");
+    for (JobComponentStatus comp : componentStatus) {
+      out.println("    [" + comp.getCompName() + "] Ready=" + comp
+          .getNumReadyContainers() + " + Running-But-Non-Ready=" + comp
+          .getNumRunningButUnreadyContainers() + " | Asked=" + comp
+          .getTotalAskedContainers());
+    }
+    out.println("------------------");
+  }
+
+  public JobState getState() {
+    return state;
+  }
+
+  public String getTensorboardLink() {
+    return tensorboardLink;
+  }
+
+  public List<JobComponentStatus> getComponentStatus() {
+    return componentStatus;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public void setState(JobState state) {
+    this.state = state;
+  }
+
+  public void setTensorboardLink(String tensorboardLink) {
+    this.tensorboardLink = tensorboardLink;
+  }
+
+  public void setComponentStatus(List<JobComponentStatus> componentStatus) {
+    this.componentStatus = componentStatus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/TaskType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/TaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/TaskType.java
new file mode 100644
index 0000000..535d994
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/TaskType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api;
+
+public enum TaskType {
+  PRIMARY_WORKER("master"),
+  WORKER("worker"),
+  PS("ps"),
+  TENSORBOARD("tensorboard");
+
+  private String compName;
+
+  TaskType(String compName) {
+    this.compName = compName;
+  }
+
+  public String getComponentName() {
+    return compName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobComponentStatusBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobComponentStatusBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobComponentStatusBuilder.java
new file mode 100644
index 0000000..fbefe6b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobComponentStatusBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api.builder;
+
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.submarine.common.api.JobComponentStatus;
+
+public class JobComponentStatusBuilder {
+  public static JobComponentStatus fromServiceComponent(Component component) {
+    long totalAskedContainers = component.getNumberOfContainers();
+    int numReadyContainers = 0;
+    int numRunningButUnreadyContainers = 0;
+    String compName = component.getName();
+
+    for (Container c : component.getContainers()) {
+      if (c.getState() == ContainerState.READY) {
+        numReadyContainers++;
+      } else if (c.getState() == ContainerState.RUNNING_BUT_UNREADY) {
+        numRunningButUnreadyContainers++;
+      }
+    }
+
+    return new JobComponentStatus(compName, numReadyContainers,
+        numRunningButUnreadyContainers, totalAskedContainers);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobStatusBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobStatusBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobStatusBuilder.java
new file mode 100644
index 0000000..2f7971e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/api/builder/JobStatusBuilder.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.api.builder;
+
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.submarine.common.api.JobComponentStatus;
+import org.apache.hadoop.yarn.submarine.common.api.JobState;
+import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JobStatusBuilder {
+  public static JobStatus fromServiceSpec(Service serviceSpec) {
+    JobStatus status = new JobStatus();
+    status.setState(fromServiceState(serviceSpec.getState()));
+
+    // If it is a final state, return.
+    if (JobState.isFinal(status.getState())) {
+      return status;
+    }
+
+    List<JobComponentStatus> componentStatusList = new ArrayList<>();
+
+    for (Component component : serviceSpec.getComponents()) {
+      componentStatusList.add(
+          JobComponentStatusBuilder.fromServiceComponent(component));
+    }
+    status.setComponentStatus(componentStatusList);
+
+    // TODO, handle tensorboard differently.
+    // status.setTensorboardLink(getTensorboardLink(serviceSpec, clientContext));
+
+    status.setJobName(serviceSpec.getName());
+
+    return status;
+  }
+
+  private static JobState fromServiceState(ServiceState serviceState) {
+    switch (serviceState) {
+    case STOPPED:
+      // TODO, once YARN-8488 gets committed, we need to update this.
+      return JobState.SUCCEEDED;
+    case FAILED:
+      return JobState.FAILED;
+    }
+
+    return JobState.RUNNING;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
new file mode 100644
index 0000000..c9e6b7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class SubmarineConfiguration extends Configuration {
+  private static final String SUBMARINE_CONFIGURATION_FILE = "submarine.xml";
+
+  public SubmarineConfiguration() {
+    this(new Configuration(false), true);
+  }
+
+  public SubmarineConfiguration(Configuration configuration) {
+    this(configuration, false);
+  }
+
+  public SubmarineConfiguration(Configuration configuration,
+      boolean loadLocalConfig) {
+    super(configuration);
+    if (loadLocalConfig) {
+      addResource(SUBMARINE_CONFIGURATION_FILE);
+    }
+  }
+
+  /*
+   * Runtime of submarine
+   */
+
+  private static final String PREFIX = "submarine.";
+
+  public static final String RUNTIME_CLASS = PREFIX + "runtime.class";
+  public static final String DEFAULT_RUNTIME_CLASS =
+      "org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceRuntimeFactory";
+
+  public void setSubmarineRuntimeClass(String runtimeClass) {
+    set(RUNTIME_CLASS, runtimeClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineLogs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineLogs.java
new file mode 100644
index 0000000..6bb3248
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineLogs.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.conf;
+
+public class SubmarineLogs {
+  private static volatile boolean verbose = false;
+
+  public static boolean isVerbose() {
+    return SubmarineLogs.verbose;
+  }
+
+  public static void verboseOn() {
+    SubmarineLogs.verbose = true;
+  }
+
+  public static void verboseOff() {
+    SubmarineLogs.verbose = false;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org