You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/03/21 20:45:42 UTC
airavata git commit: Add task creation logic for unicore jobs
Repository: airavata
Updated Branches:
refs/heads/develop 8496ae6c1 -> 6d320248f
Add task creation logic for unicore jobs
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d320248
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d320248
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d320248
Branch: refs/heads/develop
Commit: 6d320248f7a0c12b651e55aa520503ca08a04d2f
Parents: 8496ae6
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Mar 21 15:45:38 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Mar 21 15:45:38 2016 -0400
----------------------------------------------------------------------
.../cpi/impl/SimpleOrchestratorImpl.java | 60 +++++++++++---------
1 file changed, 34 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d320248/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ca7be48..427be5b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -283,36 +283,43 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
throw new OrchestratorException("Compute Resource Id cannot be null at this point");
}
ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
-
+ JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
- taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
- JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
- if (autoSchedule) {
- List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
- for (BatchQueue batchQueue : definedBatchQueues) {
- if (batchQueue.getQueueName().equals(userGivenQueueName)) {
- int maxRunTime = batchQueue.getMaxRunTime();
- if (maxRunTime < userGivenWallTime) {
- resourceSchedule.setWallTimeLimit(maxRunTime);
- // need to create more job submissions
- int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
- for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
- }
- int leftWallTime = userGivenWallTime % maxRunTime;
- if (leftWallTime != 0) {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+ ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ } else {
+ taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
+
+ if (autoSchedule) {
+ List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
+ for (BatchQueue batchQueue : definedBatchQueues) {
+ if (batchQueue.getQueueName().equals(userGivenQueueName)) {
+ int maxRunTime = batchQueue.getMaxRunTime();
+ if (maxRunTime < userGivenWallTime) {
+ resourceSchedule.setWallTimeLimit(maxRunTime);
+ // need to create more job submissions
+ int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
+ for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
+ }
+ int leftWallTime = userGivenWallTime % maxRunTime;
+ if (leftWallTime != 0) {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+ }
+ } else {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
- } else {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
}
+ } else {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
- } else {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+
+ taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
}
- taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+
// update process scheduling
experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
return getTaskDag(taskIdList);
@@ -466,7 +473,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId());
monitorMode = sshJobSubmission.getMonitorMode();
- }else {
+ } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+ monitorMode = MonitorMode.FORK;
+ } else {
logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
@@ -482,8 +491,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
submissionSubTask.setMonitorMode(monitorMode);
- submissionSubTask.setJobSubmissionProtocol(
- OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext, processModel, gatewayId));
+ submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
submissionSubTask.setWallTime(wallTime);
byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
taskModel.setSubTaskModel(bytes);