You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/13 08:53:13 UTC
[2/3] hadoop git commit: YARN-8561. [Submarine] Initial
implementation: Training job submission and job history retrieval.
Contributed by Wangda Tan.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
new file mode 100644
index 0000000..b6a39b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.exception;
+
+public class SubmarineException extends Exception {
+ public SubmarineException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
new file mode 100644
index 0000000..4fb74fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.exception;
+
+public class SubmarineRuntimeException extends RuntimeException {
+ public SubmarineRuntimeException(String s) {
+ super(s);
+ }
+
+ public SubmarineRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
new file mode 100644
index 0000000..fe8956a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+
+import java.io.IOException;
+
+/**
+ * Manages remote directories for staging, log, etc.
+ * TODO, need to properly handle permission / name validation, etc.
+ */
+public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
+ FileSystem fs;
+
+ public DefaultRemoteDirectoryManager(ClientContext context) {
+ try {
+ this.fs = FileSystem.get(context.getYarnConfig());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Path getJobStagingArea(String jobName, boolean create) throws IOException {
+ Path staging = new Path(getJobRootFolder(jobName), "staging");
+ if (create) {
+ createFolderIfNotExist(staging);
+ }
+ return staging;
+ }
+
+ @Override
+ public Path getJobCheckpointDir(String jobName, boolean create)
+ throws IOException {
+ Path jobDir = new Path(getJobStagingArea(jobName, create),
+ CliConstants.CHECKPOINT_PATH);
+ if (create) {
+ createFolderIfNotExist(jobDir);
+ }
+ return jobDir;
+ }
+
+ @Override
+ public Path getModelDir(String modelName, boolean create) throws IOException {
+ Path modelDir = new Path(new Path("submarine", "models"), modelName);
+ if (create) {
+ createFolderIfNotExist(modelDir);
+ }
+ return modelDir;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ private Path getJobRootFolder(String jobName) throws IOException {
+ return new Path(new Path("submarine", "jobs"), jobName);
+ }
+
+ private void createFolderIfNotExist(Path path) throws IOException {
+ if (!fs.exists(path)) {
+ if (!fs.mkdirs(path)) {
+ throw new IOException("Failed to create folder=" + path);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
new file mode 100644
index 0000000..132b314
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.common.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface RemoteDirectoryManager {
+ Path getJobStagingArea(String jobName, boolean create) throws IOException;
+
+ Path getJobCheckpointDir(String jobName, boolean create) throws IOException;
+
+ Path getModelDir(String modelName, boolean create) throws IOException;
+
+ FileSystem getFileSystem() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
new file mode 100644
index 0000000..9c164c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
+import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
+
+import java.lang.reflect.InvocationTargetException;
+
+public abstract class RuntimeFactory {
+ protected ClientContext clientContext;
+ private JobSubmitter jobSubmitter;
+ private JobMonitor jobMonitor;
+ private SubmarineStorage submarineStorage;
+
+ public RuntimeFactory(ClientContext clientContext) {
+ this.clientContext = clientContext;
+ }
+
+ public static RuntimeFactory getRuntimeFactory(
+ ClientContext clientContext) {
+ Configuration submarineConfiguration =
+ clientContext.getSubmarineConfig();
+ String runtimeClass = submarineConfiguration.get(
+ SubmarineConfiguration.RUNTIME_CLASS,
+ SubmarineConfiguration.DEFAULT_RUNTIME_CLASS);
+
+ try {
+ Class<?> runtimeClazz = Class.forName(runtimeClass);
+ if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) {
+ return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext);
+ } else {
+ throw new SubmarineRuntimeException("Class: " + runtimeClass
+ + " not instance of " + RuntimeFactory.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException | IllegalAccessException |
+ InstantiationException | NoSuchMethodException |
+ InvocationTargetException e) {
+ throw new SubmarineRuntimeException(
+ "Could not instantiate RuntimeFactory: " + runtimeClass, e);
+ }
+ }
+
+ protected abstract JobSubmitter internalCreateJobSubmitter();
+
+ protected abstract JobMonitor internalCreateJobMonitor();
+
+ protected abstract SubmarineStorage internalCreateSubmarineStorage();
+
+ public synchronized JobSubmitter getJobSubmitterInstance() {
+ if (jobSubmitter == null) {
+ jobSubmitter = internalCreateJobSubmitter();
+ }
+ return jobSubmitter;
+ }
+
+ public synchronized JobMonitor getJobMonitorInstance() {
+ if (jobMonitor == null) {
+ jobMonitor = internalCreateJobMonitor();
+ }
+ return jobMonitor;
+ }
+
+ public synchronized SubmarineStorage getSubmarineStorage() {
+ if (submarineStorage == null) {
+ submarineStorage = internalCreateSubmarineStorage();
+ }
+ return submarineStorage;
+ }
+
+ @VisibleForTesting
+ public synchronized void setJobSubmitterInstance(JobSubmitter jobSubmitter) {
+ this.jobSubmitter = jobSubmitter;
+ }
+
+ @VisibleForTesting
+ public synchronized void setJobMonitorInstance(JobMonitor jobMonitor) {
+ this.jobMonitor = jobMonitor;
+ }
+
+ @VisibleForTesting
+ public synchronized void setSubmarineStorage(SubmarineStorage storage) {
+ this.submarineStorage = storage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
new file mode 100644
index 0000000..ebf9581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+/**
+ * A super naive FS-based storage.
+ */
+public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
+ ClientContext clientContext;
+ RemoteDirectoryManager rdm;
+
+ public FSBasedSubmarineStorageImpl(ClientContext clientContext) {
+ this.clientContext = clientContext;
+ rdm = clientContext.getRemoteDirectoryManager();
+ }
+
+ @Override
+ public void addNewJob(String jobName, Map<String, String> jobInfo)
+ throws IOException {
+ Path jobInfoPath = getJobInfoPath(jobName, true);
+ FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath);
+ serializeMap(fos, jobInfo);
+ }
+
+ @Override
+ public Map<String, String> getJobInfoByName(String jobName)
+ throws IOException {
+ Path jobInfoPath = getJobInfoPath(jobName, false);
+ FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath);
+ return deserializeMap(fis);
+ }
+
+ @Override
+ public void addNewModel(String modelName, String version,
+ Map<String, String> modelInfo) throws IOException {
+ Path modelInfoPath = getModelInfoPath(modelName, version, true);
+ FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath);
+ serializeMap(fos, modelInfo);
+ }
+
+ @Override
+ public Map<String, String> getModelInfoByName(String modelName,
+ String version) throws IOException {
+ Path modelInfoPath = getModelInfoPath(modelName, version, false);
+ FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath);
+ return deserializeMap(fis);
+ }
+
+ private Path getModelInfoPath(String modelName, String version, boolean create)
+ throws IOException {
+ Path modelDir = rdm.getModelDir(modelName, create);
+ Path modelInfo = new Path(modelDir, version + ".info");
+ return modelInfo;
+ }
+
+ private void serializeMap(FSDataOutputStream fos, Map<String, String> map)
+ throws IOException {
+ ObjectOutput oo = new ObjectOutputStream(fos);
+ oo.writeObject(map);
+ oo.close();
+ }
+
+ private Map<String, String> deserializeMap(FSDataInputStream fis)
+ throws IOException {
+ ObjectInput oi = new ObjectInputStream(fis);
+ Map<String, String> newMap = null;
+ try {
+ newMap = (Map<String, String>) oi.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return newMap;
+ }
+
+ private Path getJobInfoPath(String jobName, boolean create) throws IOException {
+ Path path = rdm.getJobStagingArea(jobName, create);
+ Path fileName = new Path(path, "job.info");
+ return fileName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
new file mode 100644
index 0000000..c81393b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.JobState;
+import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Monitor status of job(s)
+ */
+public abstract class JobMonitor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(JobMonitor.class);
+ protected ClientContext clientContext;
+
+ public JobMonitor(ClientContext clientContext) {
+ this.clientContext = clientContext;
+ }
+
+ /**
+ * Returns status of training job.
+ *
+ * @param jobName name of job
+ * @return job status
+ * @throws IOException anything else happens
+ * @throws YarnException anything related to YARN happens
+ */
+ public abstract JobStatus getTrainingJobStatus(String jobName)
+ throws IOException, YarnException;
+
+ /**
+ * Continue wait and print status if job goes to ready or final state.
+ * @param jobName
+ * @throws IOException
+ * @throws YarnException
+ * @throws SubmarineException
+ */
+ public void waitTrainingFinal(String jobName)
+ throws IOException, YarnException, SubmarineException {
+ // Wait 5 sec between each fetch.
+ int waitIntervalSec = 5;
+ JobStatus js;
+ while (true) {
+ js = getTrainingJobStatus(jobName);
+ JobState jobState = js.getState();
+ js.nicePrint(System.err);
+
+ if (JobState.isFinal(jobState)) {
+ if (jobState.equals(JobState.FAILED)) {
+ throw new SubmarineException("Job failed");
+ } else if (jobState.equals(JobState.KILLED)) {
+ throw new SubmarineException("Job killed");
+ }
+ LOG.info("Job exited with state=" + jobState);
+ break;
+ }
+
+ try {
+ Thread.sleep(waitIntervalSec * 1000);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
new file mode 100644
index 0000000..1749390
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+
+import java.io.IOException;
+
+/**
+ * Submit job to cluster master
+ */
+public interface JobSubmitter {
+ /**
+ * Submit job to cluster
+ * @param parameters run job parameters
+ * @return applicatioId when successfully submitted
+ * @throws YarnException for issues while contacting YARN daemons
+ * @throws IOException for other issues.
+ */
+ ApplicationId submitJob(RunJobParameters parameters)
+ throws IOException, YarnException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
new file mode 100644
index 0000000..1fbbe7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+public class StorageKeyConstants {
+ public static final String JOB_NAME = "JOB_NAME";
+ public static final String JOB_RUN_ARGS = "JOB_RUN_ARGS";
+ public static final String APPLICATION_ID = "APPLICATION_ID";
+ public static final String CHECKPOINT_PATH = "CHECKPOINT_PATH";
+ public static final String INPUT_PATH = "INPUT_PATH";
+ public static final String SAVED_MODEL_PATH = "SAVED_MODEL_PATH";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
new file mode 100644
index 0000000..9c2004f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Persistent job/model, etc.
+ */
+public abstract class SubmarineStorage {
+ /**
+ * Add a new job by name
+ * @param jobName name of job.
+ * @param jobInfo info of the job.
+ */
+ public abstract void addNewJob(String jobName, Map<String, String> jobInfo)
+ throws IOException;
+
+ /**
+ * Get job info by job name.
+ * @param jobName name of job
+ * @return info of the job.
+ */
+ public abstract Map<String, String> getJobInfoByName(String jobName)
+ throws IOException;
+
+ /**
+ * Add a new model
+ * @param modelName name of model
+ * @param version version of the model, when null is specified, it will be
+ * "default"
+ * @param modelInfo info of the model.
+ */
+ public abstract void addNewModel(String modelName, String version,
+ Map<String, String> modelInfo) throws IOException;
+
+ /**
+ * Get model info by name and version.
+ * @param modelName name of model.
+ * @param version version of the model, when null is specifed, it will be
+ */
+ public abstract Map<String, String> getModelInfoByName(String modelName, String version)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
new file mode 100644
index 0000000..94d30b0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
+import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+
+import java.io.IOException;
+
+public class YarnServiceJobMonitor extends JobMonitor {
+ private ServiceClient serviceClient = null;
+
+ public YarnServiceJobMonitor(ClientContext clientContext) {
+ super(clientContext);
+ }
+
+ @Override
+ public synchronized JobStatus getTrainingJobStatus(String jobName)
+ throws IOException, YarnException {
+ if (this.serviceClient == null) {
+ this.serviceClient = YarnServiceUtils.createServiceClient(
+ clientContext.getYarnConfig());
+ }
+
+ Service serviceSpec = this.serviceClient.getStatus(jobName);
+ JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec);
+ return jobStatus;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
new file mode 100644
index 0000000..3cd0d7e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.Resource;
+import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.common.Envs;
+import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+/**
+ * Submit a job to cluster
+ */
+public class YarnServiceJobSubmitter implements JobSubmitter {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(YarnServiceJobSubmitter.class);
+ ClientContext clientContext;
+ Service serviceSpec;
+ private Set<Path> uploadedFiles = new HashSet<>();
+
+ public YarnServiceJobSubmitter(ClientContext clientContext) {
+ this.clientContext = clientContext;
+ }
+
+ private Resource getServiceResourceFromYarnResource(
+ org.apache.hadoop.yarn.api.records.Resource yarnResource) {
+ Resource serviceResource = new Resource();
+ serviceResource.setCpus(yarnResource.getVirtualCores());
+ serviceResource.setMemory(String.valueOf(yarnResource.getMemorySize()));
+
+ Map<String, ResourceInformation> riMap = new HashMap<>();
+ for (org.apache.hadoop.yarn.api.records.ResourceInformation ri : yarnResource
+ .getAllResourcesListCopy()) {
+ ResourceInformation serviceRi =
+ new ResourceInformation();
+ serviceRi.setValue(ri.getValue());
+ serviceRi.setUnit(ri.getUnits());
+ riMap.put(ri.getName(), serviceRi);
+ }
+ serviceResource.setResourceInformations(riMap);
+
+ return serviceResource;
+ }
+
+ private String getValueOfEnvionment(String envar) {
+ // extract value from "key=value" form
+ if (envar == null || !envar.contains("=")) {
+ return "";
+ } else {
+ return envar.substring(envar.indexOf("=") + 1);
+ }
+ }
+
+ private void addHdfsClassPathIfNeeded(RunJobParameters parameters,
+ FileWriter fw, Component comp) throws IOException {
+ // Find envs to use HDFS
+ String hdfsHome = null;
+ String javaHome = null;
+
+ boolean hadoopEnv = false;
+
+ for (String envar : parameters.getEnvars()) {
+ if (envar.startsWith("DOCKER_HADOOP_HDFS_HOME=")) {
+ hdfsHome = getValueOfEnvionment(envar);
+ hadoopEnv = true;
+ } else if (envar.startsWith("DOCKER_JAVA_HOME=")) {
+ javaHome = getValueOfEnvionment(envar);
+ }
+ }
+
+ boolean lackingEnvs = false;
+
+ if ((parameters.getInputPath() != null && parameters.getInputPath()
+ .contains("hdfs://")) || (parameters.getCheckpointPath() != null
+ && parameters.getCheckpointPath().contains("hdfs://")) || (
+ parameters.getSavedModelPath() != null && parameters.getSavedModelPath()
+ .contains("hdfs://")) || hadoopEnv) {
+ // HDFS is asked either in input or output, set LD_LIBRARY_PATH
+ // and classpath
+
+ if (hdfsHome != null) {
+ // Unset HADOOP_HOME/HADOOP_YARN_HOME to make sure host machine's envs
+ // won't pollute docker's env.
+ fw.append("export HADOOP_HOME=\n");
+ fw.append("export HADOOP_YARN_HOME=\n");
+ fw.append("export HADOOP_HDFS_HOME=" + hdfsHome + "\n");
+ } else{
+ lackingEnvs = true;
+ }
+
+ // hadoop confs will be uploaded to HDFS and localized to container's
+ // local folder, so here set $HADOOP_CONF_DIR to $WORK_DIR.
+ fw.append("export HADOOP_CONF_DIR=$WORK_DIR\n");
+ if (javaHome != null) {
+ fw.append("export JAVA_HOME=" + javaHome + "\n");
+ fw.append("export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:"
+ + "$JAVA_HOME/lib/amd64/server\n");
+ } else {
+ lackingEnvs = true;
+ }
+ fw.append("export CLASSPATH=`$HADOOP_HDFS_HOME/bin/hadoop classpath --glob`\n");
+ }
+
+ if (lackingEnvs) {
+ LOG.error("When hdfs is being used to read/write models/data. Following"
+ + "envs are required: 1) DOCKER_HADOOP_HDFS_HOME=<HDFS_HOME inside"
+ + "docker container> 2) DOCKER_JAVA_HOME=<JAVA_HOME inside docker"
+ + "container>. You can use --env to pass these envars.");
+ throw new IOException("Failed to detect HDFS-related environments.");
+ }
+
+ // Trying to upload core-site.xml and hdfs-site.xml
+ Path stagingDir =
+ clientContext.getRemoteDirectoryManager().getJobStagingArea(
+ parameters.getName(), true);
+ File coreSite = findFileOnClassPath("core-site.xml");
+ File hdfsSite = findFileOnClassPath("hdfs-site.xml");
+ if (coreSite == null || hdfsSite == null) {
+ LOG.error("hdfs is being used, however we couldn't locate core-site.xml/"
+ + "hdfs-site.xml from classpath, please double check you classpath"
+ + "setting and make sure they're included.");
+ throw new IOException(
+ "Failed to locate core-site.xml / hdfs-site.xml from class path");
+ }
+ uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir,
+ coreSite.getAbsolutePath(), "core-site.xml", comp);
+ uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir,
+ hdfsSite.getAbsolutePath(), "hdfs-site.xml", comp);
+
+ // DEBUG
+ if (SubmarineLogs.isVerbose()) {
+ fw.append("echo $CLASSPATH\n");
+ fw.append("echo $JAVA_HOME\n");
+ fw.append("echo $LD_LIBRARY_PATH\n");
+ fw.append("echo $HADOOP_HDFS_HOME\n");
+ }
+ }
+
+ private void addCommonEnvironments(Component component, TaskType taskType) {
+ Map<String, String> envs = component.getConfiguration().getEnv();
+ envs.put(Envs.TASK_INDEX_ENV, ServiceApiConstants.COMPONENT_ID);
+ envs.put(Envs.TASK_TYPE_ENV, taskType.name());
+ }
+
+ /*
+ * Generate a command launch script on local disk, returns patch to the script
+ */
+ private String generateCommandLaunchScript(RunJobParameters parameters,
+ TaskType taskType, Component comp) throws IOException {
+ File file = File.createTempFile(taskType.name() + "-launch-script", ".sh");
+ FileWriter fw = new FileWriter(file);
+
+ fw.append("#!/bin/bash\n");
+
+ addHdfsClassPathIfNeeded(parameters, fw, comp);
+
+ // For primary_worker
+ if (taskType == TaskType.PRIMARY_WORKER) {
+ // Do we need tensorboard?
+ if (parameters.isTensorboardEnabled()) {
+ int tensorboardPort = 6006;
+ // Run tensorboard at the background
+ fw.append(
+ "tensorboard --port " + tensorboardPort + " --logdir " + parameters
+ .getCheckpointPath() + " &\n");
+ }
+ }
+
+ // When distributed training is required
+ if (parameters.isDistributed()) {
+ // Generated TF_CONFIG
+ String tfConfigEnv = YarnServiceUtils.getTFConfigEnv(
+ taskType.getComponentName(), parameters.getNumWorkers(),
+ parameters.getNumPS(), parameters.getName(),
+ System.getProperty("user.name"),
+ clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name"));
+ fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
+ }
+
+ // Print launch command
+ if (taskType.equals(TaskType.WORKER) || taskType.equals(
+ TaskType.PRIMARY_WORKER)) {
+ fw.append(parameters.getWorkerLaunchCmd() + '\n');
+
+ if (SubmarineLogs.isVerbose()) {
+ LOG.info("Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
+ }
+ } else if (taskType.equals(TaskType.PS)) {
+ fw.append(parameters.getPSLaunchCmd() + '\n');
+
+ if (SubmarineLogs.isVerbose()) {
+ LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
+ }
+ }
+
+ fw.close();
+ return file.getAbsolutePath();
+ }
+
+ private String getScriptFileName(TaskType taskType) {
+ return "run-" + taskType.name() + ".sh";
+ }
+
+ private File findFileOnClassPath(final String fileName) {
+ final String classpath = System.getProperty("java.class.path");
+ final String pathSeparator = System.getProperty("path.separator");
+ final StringTokenizer tokenizer = new StringTokenizer(classpath,
+ pathSeparator);
+
+ while (tokenizer.hasMoreTokens()) {
+ final String pathElement = tokenizer.nextToken();
+ final File directoryOrJar = new File(pathElement);
+ final File absoluteDirectoryOrJar = directoryOrJar.getAbsoluteFile();
+ if (absoluteDirectoryOrJar.isFile()) {
+ final File target = new File(absoluteDirectoryOrJar.getParent(),
+ fileName);
+ if (target.exists()) {
+ return target;
+ }
+ } else{
+ final File target = new File(directoryOrJar, fileName);
+ if (target.exists()) {
+ return target;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
+ String fileToUpload, String destFilename, Component comp)
+ throws IOException {
+ FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
+
+ // Upload to remote FS under staging area
+ File localFile = new File(fileToUpload);
+ if (!localFile.exists()) {
+ throw new FileNotFoundException(
+ "Trying to upload file=" + localFile.getAbsolutePath()
+ + " to remote, but couldn't find local file.");
+ }
+ String filename = new File(fileToUpload).getName();
+
+ Path uploadedFilePath = new Path(stagingDir, filename);
+ if (!uploadedFiles.contains(uploadedFilePath)) {
+ if (SubmarineLogs.isVerbose()) {
+ LOG.info("Copying local file=" + fileToUpload + " to remote="
+ + uploadedFilePath);
+ }
+ fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
+ uploadedFiles.add(uploadedFilePath);
+ }
+
+ FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
+ LOG.info("Uploaded file path = " + fileStatus.getPath());
+
+ // Set it to component's files list
+ comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
+ fileStatus.getPath().toUri().toString()).destFile(destFilename)
+ .type(ConfigFile.TypeEnum.STATIC));
+ }
+
+ private void handleLaunchCommand(RunJobParameters parameters,
+ TaskType taskType, Component component) throws IOException {
+ // Get staging area directory
+ Path stagingDir =
+ clientContext.getRemoteDirectoryManager().getJobStagingArea(
+ parameters.getName(), true);
+
+ // Generate script file in the local disk
+ String localScriptFile = generateCommandLaunchScript(parameters, taskType,
+ component);
+ String destScriptFileName = getScriptFileName(taskType);
+ uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir, localScriptFile,
+ destScriptFileName, component);
+
+ component.setLaunchCommand("./" + destScriptFileName);
+ }
+
+ private void addWorkerComponent(Service service,
+ RunJobParameters parameters, TaskType taskType) throws IOException {
+ Component workerComponent = new Component();
+ addCommonEnvironments(workerComponent, taskType);
+
+ workerComponent.setName(taskType.getComponentName());
+
+ if (taskType.equals(TaskType.PRIMARY_WORKER)) {
+ workerComponent.setNumberOfContainers(1L);
+ } else{
+ workerComponent.setNumberOfContainers(
+ (long) parameters.getNumWorkers() - 1);
+ }
+
+ if (parameters.getWorkerDockerImage() != null) {
+ workerComponent.setArtifact(
+ getDockerArtifact(parameters.getWorkerDockerImage()));
+ }
+
+ workerComponent.setResource(
+ getServiceResourceFromYarnResource(parameters.getWorkerResource()));
+ handleLaunchCommand(parameters, taskType, workerComponent);
+ workerComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
+ service.addComponent(workerComponent);
+ }
+
+ // Handle worker and primary_worker.
+ private void addWorkerComponents(Service service, RunJobParameters parameters)
+ throws IOException {
+ addWorkerComponent(service, parameters, TaskType.PRIMARY_WORKER);
+
+ if (parameters.getNumWorkers() > 1) {
+ addWorkerComponent(service, parameters, TaskType.WORKER);
+ }
+ }
+
+ private void appendToEnv(Service service, String key, String value,
+ String delim) {
+ Map<String, String> env = service.getConfiguration().getEnv();
+ if (!env.containsKey(key)) {
+ env.put(key, value);
+ } else {
+ if (!value.isEmpty()) {
+ String existingValue = env.get(key);
+ if (!existingValue.endsWith(delim)) {
+ env.put(key, existingValue + delim + value);
+ } else {
+ env.put(key, existingValue + value);
+ }
+ }
+ }
+ }
+
+ private void handleServiceEnvs(Service service, RunJobParameters parameters) {
+ if (parameters.getEnvars() != null) {
+ for (String envarPair : parameters.getEnvars()) {
+ String key, value;
+ if (envarPair.contains("=")) {
+ int idx = envarPair.indexOf('=');
+ key = envarPair.substring(0, idx);
+ value = envarPair.substring(idx + 1);
+ } else{
+ // No "=" found so use the whole key
+ key = envarPair;
+ value = "";
+ }
+ appendToEnv(service, key, value, ":");
+ }
+ }
+
+ // Append other configs like /etc/passwd, /etc/krb5.conf
+ appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+ "/etc/passwd:/etc/passwd:ro", ",");
+
+ String authenication = clientContext.getYarnConfig().get(
+ HADOOP_SECURITY_AUTHENTICATION);
+ if (authenication != null && authenication.equals("kerberos")) {
+ appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+ "/etc/krb5.conf:/etc/krb5.conf:ro", ",");
+ }
+ }
+
+ private Artifact getDockerArtifact(String dockerImageName) {
+ return new Artifact().type(Artifact.TypeEnum.DOCKER).id(dockerImageName);
+ }
+
+ private Service createServiceByParameters(RunJobParameters parameters)
+ throws IOException {
+ Service service = new Service();
+ service.setName(parameters.getName());
+ service.setVersion(String.valueOf(System.currentTimeMillis()));
+ service.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
+
+ handleServiceEnvs(service, parameters);
+
+ addWorkerComponents(service, parameters);
+
+ if (parameters.getNumPS() > 0) {
+ Component psComponent = new Component();
+ psComponent.setName(TaskType.PS.getComponentName());
+ addCommonEnvironments(psComponent, TaskType.PS);
+ psComponent.setNumberOfContainers((long) parameters.getNumPS());
+ psComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
+ psComponent.setResource(
+ getServiceResourceFromYarnResource(parameters.getPsResource()));
+
+ // Override global docker image if needed.
+ if (parameters.getPsDockerImage() != null) {
+ psComponent.setArtifact(
+ getDockerArtifact(parameters.getPsDockerImage()));
+ }
+ handleLaunchCommand(parameters, TaskType.PS, psComponent);
+ service.addComponent(psComponent);
+ }
+ return service;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ApplicationId submitJob(RunJobParameters parameters)
+ throws IOException, YarnException {
+ Service service = createServiceByParameters(parameters);
+ ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
+ clientContext.getYarnConfig());
+ ApplicationId appid = serviceClient.actionCreate(service);
+ serviceClient.stop();
+ this.serviceSpec = service;
+ return appid;
+ }
+
+ @VisibleForTesting
+ public Service getServiceSpec() {
+ return serviceSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
new file mode 100644
index 0000000..3489e49
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+
+public class YarnServiceRuntimeFactory extends RuntimeFactory {
+
+ public YarnServiceRuntimeFactory(ClientContext clientContext) {
+ super(clientContext);
+ }
+
+ @Override
+ protected JobSubmitter internalCreateJobSubmitter() {
+ return new YarnServiceJobSubmitter(super.clientContext);
+ }
+
+ @Override
+ protected JobMonitor internalCreateJobMonitor() {
+ return new YarnServiceJobMonitor(super.clientContext);
+ }
+
+ @Override
+ protected SubmarineStorage internalCreateSubmarineStorage() {
+ return new FSBasedSubmarineStorageImpl(super.clientContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
new file mode 100644
index 0000000..f7ecc97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.common.Envs;
+
+public class YarnServiceUtils {
+ // This will be true only in UT.
+ private static ServiceClient stubServiceClient = null;
+
+ public static ServiceClient createServiceClient(
+ Configuration yarnConfiguration) {
+ if (stubServiceClient != null) {
+ return stubServiceClient;
+ }
+
+ ServiceClient serviceClient = new ServiceClient();
+ serviceClient.init(yarnConfiguration);
+ serviceClient.start();
+ return serviceClient;
+ }
+
+ @VisibleForTesting
+ public static void setStubServiceClient(ServiceClient stubServiceClient) {
+ YarnServiceUtils.stubServiceClient = stubServiceClient;
+ }
+
+ public static String getTFConfigEnv(String curCommponentName, int nWorkers,
+ int nPs, String serviceName, String userName, String domain) {
+ String commonEndpointSuffix =
+ "." + serviceName + "." + userName + "." + domain + ":8000";
+
+ String json = "{\\\"cluster\\\":{";
+
+ String master = getComponentArrayJson("master", 1, commonEndpointSuffix)
+ + ",";
+ String worker = getComponentArrayJson("worker", nWorkers - 1,
+ commonEndpointSuffix) + ",";
+ String ps = getComponentArrayJson("ps", nPs, commonEndpointSuffix) + "},";
+
+ String task =
+ "\\\"task\\\":{" + " \\\"type\\\":\\\"" + curCommponentName + "\\\","
+ + " \\\"index\\\":" + '$' + Envs.TASK_INDEX_ENV + "},";
+ String environment = "\\\"environment\\\":\\\"cloud\\\"}";
+
+ return json + master + worker + ps + task + environment;
+ }
+
+ private static String getComponentArrayJson(String componentName, int count,
+ String endpointSuffix) {
+ String component = "\\\"" + componentName + "\\\":";
+ String array = "[";
+ for (int i = 0; i < count; i++) {
+ array = array + "\\\"" + componentName + "-" + i
+ + endpointSuffix + "\\\"";
+ if (i != count - 1) {
+ array = array + ",";
+ }
+ }
+ array = array + "]";
+ return component + array;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
new file mode 100644
index 0000000..ce26ea7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md
@@ -0,0 +1,26 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# Developper Guide
+
+(Need add more details)
+
+By default, submarine uses YARN service framework as runtime. If you want to add your own implementation. You can add a new `RuntimeFactory` implementation and configure following option to `submarine.xml` (which should be placed under same `$HADOOP_CONF_DIR`)
+
+```
+<property>
+ <name>submarine.runtime.class</name>
+ <value>... full qualified class name for your runtime factory ... </value>
+</property>
+```
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
new file mode 100644
index 0000000..b720b5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md
@@ -0,0 +1,134 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# Quick Start Guide
+
+## Prerequisite
+
+Must:
+- Apache Hadoop 3.1.0, YARN service enabled.
+
+Optional:
+- Enable YARN DNS. (When distributed training required.)
+- Enable GPU on YARN support. (When GPU-based training required.)
+
+## Run jobs
+
+### Commandline options
+
+```$xslt
+usage: job run
+ -checkpoint_path <arg> Training output directory of the job, could
+ be local or other FS directory. This
+ typically includes checkpoint files and
+ exported model
+ -docker_image <arg> Docker image name/tag
+ -env <arg> Common environment variable of worker/ps
+ -input_path <arg> Input of the job, could be local or other FS
+ directory
+ -name <arg> Name of the job
+ -num_ps <arg> Number of PS tasks of the job, by default
+ it's 0
+ -num_workers <arg> Numnber of worker tasks of the job, by
+ default it's 1
+ -ps_docker_image <arg> Specify docker image for PS, when this is
+ not specified, PS uses --docker_image as
+ default.
+ -ps_launch_cmd <arg> Commandline of worker, arguments will be
+ directly used to launch the PS
+ -ps_resources <arg> Resource of each PS, for example
+ memory-mb=2048,vcores=2,yarn.io/gpu=2
+ -queue <arg> Name of queue to run the job, by default it
+ uses default queue
+ -saved_model_path <arg> Model exported path (savedmodel) of the job,
+ which is needed when exported model is not
+ placed under ${checkpoint_path}could be
+ local or other FS directory. This will be
+ used to serve.
+ -tensorboard <arg> Should we run TensorBoard for this job? By
+ default it's true
+ -verbose Print verbose log for troubleshooting
+ -wait_job_finish Specified when user want to wait the job
+ finish
+ -worker_docker_image <arg> Specify docker image for WORKER, when this
+ is not specified, WORKER uses --docker_image
+ as default.
+ -worker_launch_cmd <arg> Commandline of worker, arguments will be
+ directly used to launch the worker
+ -worker_resources <arg> Resource of each worker, for example
+ memory-mb=2048,vcores=2,yarn.io/gpu=2
+```
+
+### Launch Standalone Tensorflow Application:
+
+#### Commandline
+```
+yarn jar path-to/hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job run \
+ --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \
+ --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 --name tf-job-001 \
+ --docker_image <your-docker-image> \
+ --input_path hdfs://default/dataset/cifar-10-data \
+ --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \
+ --worker_resources memory=4G,vcores=2,gpu=2 \
+ --worker_launch_cmd "python ... (Your training application cmd)"
+```
+
+#### Notes:
+
+1) `DOCKER_JAVA_HOME` points to JAVA_HOME inside Docker image.
+2) `DOCKER_HADOOP_HDFS_HOME` points to HADOOP_HDFS_HOME inside Docker image.
+3) `--worker_resources` can include gpu when you need GPU to train your task.
+
+### Launch Distributed Tensorflow Application:
+
+#### Commandline
+
+```
+yarn jar hadoop-yarn-applications-submarine-<version>.jar job run \
+ --name tf-job-001 --docker_image <your docker image> \
+ --input_path hdfs://default/dataset/cifar-10-data \
+ --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \
+ --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \
+ --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 \
+ --num_workers 2 \
+ --worker_resources memory=8G,vcores=2,gpu=1 --worker_launch_cmd "cmd for worker ..." \
+ --num_ps 2 \
+ --ps_resources memory=4G,vcores=2,gpu=0 --ps_launch_cmd "cmd for ps" \
+```
+
+#### Notes:
+
+1) Very similar to standalone TF application, but you need to specify #worker/#ps
+2) Different resources can be specified for worker and PS.
+3) `TF_CONFIG` environment will be auto generated and set before executing user's launch command.
+
+## Run jobs
+
+### Get Job Status
+
+```
+yarn jar hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job show --name tf-job-001
+```
+
+Output looks like:
+```
+Job Meta Info:
+ Application Id: application_1532131617202_0005
+ Input Path: hdfs://default/dataset/cifar-10-data
+ Checkpoint Path: hdfs://default/tmp/cifar-10-jobdir
+ Run Parameters: --name tf-job-001 --docker_image wtan/tf-1.8.0-gpu:0.0.3
+ (... all your commandline before run the job)
+```
+
+After that, you can run ```tensorboard --logdir=<checkpoint-path>``` to view Tensorboard of the job.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
new file mode 100644
index 0000000..295d6a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRunJobCliParsing {
+ @Before
+ public void before() {
+ SubmarineLogs.verboseOff();
+ }
+
+ @Test
+ public void testPrintHelp() {
+ MockClientContext mockClientContext = new MockClientContext();
+ JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
+ JobMonitor mockJobMonitor = mock(JobMonitor.class);
+ RunJobCli runJobCli = new RunJobCli(mockClientContext, mockJobSubmitter,
+ mockJobMonitor);
+ runJobCli.printUsages();
+ }
+
+ private MockClientContext getMockClientContext()
+ throws IOException, YarnException {
+ MockClientContext mockClientContext = new MockClientContext();
+ JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
+ when(mockJobSubmitter.submitJob(any(RunJobParameters.class))).thenReturn(
+ ApplicationId.newInstance(1234L, 1));
+ JobMonitor mockJobMonitor = mock(JobMonitor.class);
+ SubmarineStorage storage = mock(SubmarineStorage.class);
+ RuntimeFactory rtFactory = mock(RuntimeFactory.class);
+
+ when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter);
+ when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor);
+ when(rtFactory.getSubmarineStorage()).thenReturn(storage);
+
+ mockClientContext.setRuntimeFactory(rtFactory);
+ return mockClientContext;
+ }
+
+ @Test
+ public void testBasicRunJobForDistributedTraining() throws Exception {
+ RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+
+ RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
+
+ Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
+ Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
+ Assert.assertEquals(jobRunParameters.getNumPS(), 2);
+ Assert.assertEquals(jobRunParameters.getPSLaunchCmd(), "python run-ps.py");
+ Assert.assertEquals(Resources.createResource(4096, 4),
+ jobRunParameters.getPsResource());
+ Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(),
+ "python run-job.py");
+ Assert.assertEquals(Resources.createResource(2048, 2),
+ jobRunParameters.getWorkerResource());
+ Assert.assertEquals(jobRunParameters.getDockerImageName(),
+ "tf-docker:1.1.0");
+ Assert.assertTrue(SubmarineLogs.isVerbose());
+ }
+
+ @Test
+ public void testBasicRunJobForSingleNodeTraining() throws Exception {
+ RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+ "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
+ "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
+ "true", "--verbose", "--wait_job_finish" });
+
+ RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
+
+ Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
+ Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
+ Assert.assertEquals(jobRunParameters.getNumWorkers(), 1);
+ Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(),
+ "python run-job.py");
+ Assert.assertEquals(Resources.createResource(4096, 2),
+ jobRunParameters.getWorkerResource());
+ Assert.assertTrue(SubmarineLogs.isVerbose());
+ Assert.assertTrue(jobRunParameters.isWaitJobFinish());
+ }
+
+ @Test
+ public void testLaunchCommandPatternReplace() throws Exception {
+ RunJobCli runJobCli = new RunJobCli(getMockClientContext());
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py --input=%input_path% --model_dir=%checkpoint_path% --export_dir=%saved_model_path%/savedmodel",
+ "--worker_resources", "memory=2048,vcores=2", "--ps_resources",
+ "memory=4096,vcores=4", "--tensorboard", "true", "--ps_launch_cmd",
+ "python run-ps.py --input=%input_path% --model_dir=%checkpoint_path%/model",
+ "--verbose" });
+
+ Assert.assertEquals(
+ "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
+ + "--export_dir=hdfs://output/savedmodel",
+ runJobCli.getRunJobParameters().getWorkerLaunchCmd());
+ Assert.assertEquals(
+ "python run-ps.py --input=hdfs://input --model_dir=hdfs://output/model",
+ runJobCli.getRunJobParameters().getPSLaunchCmd());
+ }
+
+ @Test
+ public void testResourceUnitParsing() throws Exception {
+ Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ res = CliUtils.createResourceFromString("memory=20G,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ res = CliUtils.createResourceFromString("memory=20M,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = CliUtils.createResourceFromString("memory=20m,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = CliUtils.createResourceFromString("memory-mb=20,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20, 3), res);
+
+ res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
+
+ // W/o unit for memory means bits, and 20 bits will be rounded to 0
+ res = CliUtils.createResourceFromString("memory=20,vcores=3",
+ ResourceUtils.getResourcesTypeInfo());
+ Assert.assertEquals(Resources.createResource(0, 3), res);
+
+ // Test multiple resources
+ List<ResourceTypeInfo> resTypes = new ArrayList<>(
+ ResourceUtils.getResourcesTypeInfo());
+ resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, ""));
+ ResourceUtils.reinitializeResources(resTypes);
+ res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = CliUtils.createResourceFromString("memory=2G,vcores=3",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3",
+ resTypes);
+ Assert.assertEquals(2 * 1024, res.getMemorySize());
+ Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
+
+ // TODO, add more negative tests.
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
new file mode 100644
index 0000000..9c0d872
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.submarine.client.cli;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.submarine.client.cli.param.ShowJobParameters;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
+import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
+import org.apache.hadoop.yarn.submarine.runtimes.common.MemorySubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestShowJobCliParsing {
+ @Before
+ public void before() {
+ SubmarineLogs.verboseOff();
+ }
+
+ @Test
+ public void testPrintHelp() {
+ MockClientContext mockClientContext = new MockClientContext();
+ ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
+ showJobCli.printUsages();
+ }
+
+ @Test
+ public void testShowJob()
+ throws InterruptedException, SubmarineException, YarnException,
+ ParseException, IOException {
+ MockClientContext mockClientContext = new MockClientContext();
+ ShowJobCli showJobCli = new ShowJobCli(mockClientContext) {
+ @Override
+ protected void getAndPrintJobInfo() {
+ // do nothing
+ }
+ };
+ showJobCli.run(new String[] { "--name", "my-job" });
+ ShowJobParameters parameters = showJobCli.getParameters();
+ Assert.assertEquals(parameters.getName(), "my-job");
+ }
+
+ private Map<String, String> getMockJobInfo(String jobName) {
+ Map<String, String> map = new HashMap<>();
+ map.put(StorageKeyConstants.APPLICATION_ID,
+ ApplicationId.newInstance(1234L, 1).toString());
+ map.put(StorageKeyConstants.JOB_RUN_ARGS, "job run 123456");
+ map.put(StorageKeyConstants.INPUT_PATH, "hdfs://" + jobName);
+ return map;
+ }
+
+ @Test
+ public void testSimpleShowJob()
+ throws InterruptedException, SubmarineException, YarnException,
+ ParseException, IOException {
+ SubmarineStorage storage = new MemorySubmarineStorage();
+ MockClientContext mockClientContext = new MockClientContext();
+ RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
+ when(runtimeFactory.getSubmarineStorage()).thenReturn(storage);
+ mockClientContext.setRuntimeFactory(runtimeFactory);
+
+ ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
+
+ try {
+ showJobCli.run(new String[] { "--name", "my-job" });
+ } catch (IOException e) {
+ // expected
+ }
+
+
+ storage.addNewJob("my-job", getMockJobInfo("my-job"));
+ showJobCli.run(new String[] { "--name", "my-job" });
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
new file mode 100644
index 0000000..e1756b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
+import org.apache.hadoop.yarn.submarine.common.MockClientContext;
+import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
+import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
+import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestYarnServiceRunJobCli {
+ @Before
+ public void before() throws IOException, YarnException {
+ SubmarineLogs.verboseOff();
+ ServiceClient serviceClient = mock(ServiceClient.class);
+ when(serviceClient.actionCreate(any(Service.class))).thenReturn(
+ ApplicationId.newInstance(1234L, 1));
+ YarnServiceUtils.setStubServiceClient(serviceClient);
+ }
+
+ @Test
+ public void testPrintHelp() {
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ runJobCli.printUsages();
+ }
+
+ private Service getServiceSpecFromJobSubmitter(JobSubmitter jobSubmitter) {
+ return ((YarnServiceJobSubmitter) jobSubmitter).getServiceSpec();
+ }
+
+ @Test
+ public void testBasicRunJobForDistributedTraining() throws Exception {
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path",
+ "s3://output", "--num_workers", "3", "--num_ps", "2",
+ "--worker_launch_cmd", "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4",
+ "--tensorboard", "true", "--ps_docker_image", "ps.image",
+ "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(3, serviceSpec.getComponents().size());
+ Assert.assertTrue(
+ serviceSpec.getComponent(TaskType.WORKER.getComponentName()) != null);
+ Assert.assertTrue(
+ serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName())
+ != null);
+ Assert.assertTrue(
+ serviceSpec.getComponent(TaskType.PS.getComponentName()) != null);
+ Component primaryWorkerComp = serviceSpec.getComponent(
+ TaskType.PRIMARY_WORKER.getComponentName());
+ Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB());
+ Assert.assertEquals(2,
+ primaryWorkerComp.getResource().getCpus().intValue());
+
+ Component workerComp = serviceSpec.getComponent(
+ TaskType.WORKER.getComponentName());
+ Assert.assertEquals(2048, workerComp.getResource().calcMemoryMB());
+ Assert.assertEquals(2, workerComp.getResource().getCpus().intValue());
+
+ Component psComp = serviceSpec.getComponent(TaskType.PS.getComponentName());
+ Assert.assertEquals(4096, psComp.getResource().calcMemoryMB());
+ Assert.assertEquals(4, psComp.getResource().getCpus().intValue());
+
+ Assert.assertEquals("worker.image", workerComp.getArtifact().getId());
+ Assert.assertEquals("ps.image", psComp.getArtifact().getId());
+
+ Assert.assertTrue(SubmarineLogs.isVerbose());
+
+ // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
+ }
+
+ @Test
+ public void testBasicRunJobForSingleNodeTraining() throws Exception {
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path",
+ "s3://output", "--num_workers", "1", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2G,vcores=2",
+ "--tensorboard", "true", "--verbose" });
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(1, serviceSpec.getComponents().size());
+ Assert.assertTrue(
+ serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName())
+ != null);
+ Component primaryWorkerComp = serviceSpec.getComponent(
+ TaskType.PRIMARY_WORKER.getComponentName());
+ Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB());
+ Assert.assertEquals(2,
+ primaryWorkerComp.getResource().getCpus().intValue());
+
+ Assert.assertTrue(SubmarineLogs.isVerbose());
+
+ // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
+ }
+
+ @Test
+ public void testParameterStorageForTrainingJob() throws Exception {
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ runJobCli.run(
+ new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path",
+ "s3://output", "--num_workers", "1", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2G,vcores=2",
+ "--tensorboard", "true", "--verbose" });
+ SubmarineStorage storage =
+ mockClientContext.getRuntimeFactory().getSubmarineStorage();
+ Map<String, String> jobInfo = storage.getJobInfoByName("my-job");
+ Assert.assertTrue(jobInfo.size() > 0);
+ Assert.assertEquals(jobInfo.get(StorageKeyConstants.INPUT_PATH),
+ "s3://input");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org