You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/03/21 20:45:42 UTC

airavata git commit: Add task creation logic for unicore jobs

Repository: airavata
Updated Branches:
  refs/heads/develop 8496ae6c1 -> 6d320248f


Add task creation logic for unicore jobs


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d320248
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d320248
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d320248

Branch: refs/heads/develop
Commit: 6d320248f7a0c12b651e55aa520503ca08a04d2f
Parents: 8496ae6
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Mar 21 15:45:38 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Mar 21 15:45:38 2016 -0400

----------------------------------------------------------------------
 .../cpi/impl/SimpleOrchestratorImpl.java        | 60 +++++++++++---------
 1 file changed, 34 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6d320248/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ca7be48..427be5b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -283,36 +283,43 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                 throw new OrchestratorException("Compute Resource Id cannot be null at this point");
             }
             ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
-
+            JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
             List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
-            taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
 
-            JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
-            if (autoSchedule) {
-                List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
-                for (BatchQueue batchQueue : definedBatchQueues) {
-                    if (batchQueue.getQueueName().equals(userGivenQueueName)) {
-                        int maxRunTime = batchQueue.getMaxRunTime();
-                        if (maxRunTime < userGivenWallTime) {
-                            resourceSchedule.setWallTimeLimit(maxRunTime);
-                            // need to create more job submissions
-                            int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
-                            for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
-                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
-                            }
-                            int leftWallTime = userGivenWallTime % maxRunTime;
-                            if (leftWallTime != 0) {
-                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+            ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+            if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) {
+                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
+            } else {
+                taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
+
+                if (autoSchedule) {
+                    List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
+                    for (BatchQueue batchQueue : definedBatchQueues) {
+                        if (batchQueue.getQueueName().equals(userGivenQueueName)) {
+                            int maxRunTime = batchQueue.getMaxRunTime();
+                            if (maxRunTime < userGivenWallTime) {
+                                resourceSchedule.setWallTimeLimit(maxRunTime);
+                                // need to create more job submissions
+                                int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
+                                for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
+                                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
+                                }
+                                int leftWallTime = userGivenWallTime % maxRunTime;
+                                if (leftWallTime != 0) {
+                                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+                                }
+                            } else {
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
                             }
-                        } else {
-                            taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
                         }
                     }
+                } else {
+                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
                 }
-            } else {
-                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+
+                taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
             }
-            taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+
             // update process scheduling
             experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
             return getTaskDag(taskIdList);
@@ -466,7 +473,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
             SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId());
             monitorMode = sshJobSubmission.getMonitorMode();
-        }else {
+        } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+            monitorMode = MonitorMode.FORK;
+        } else {
             logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
                     processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
             throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
@@ -482,8 +491,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
         JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
         submissionSubTask.setMonitorMode(monitorMode);
-        submissionSubTask.setJobSubmissionProtocol(
-                OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext, processModel, gatewayId));
+        submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
         submissionSubTask.setWallTime(wallTime);
         byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
         taskModel.setSubTaskModel(bytes);