You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/12 00:40:17 UTC
airavata git commit: User user compute resource preference if user
provided
Repository: airavata
Updated Branches:
refs/heads/develop f66043474 -> 6e5d1c6ee
User user compute resource preference if user provided
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6e5d1c6e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6e5d1c6e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6e5d1c6e
Branch: refs/heads/develop
Commit: 6e5d1c6eee8cc6772ca4381da2ec5d2a8ac58a7d
Parents: f660434
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Nov 11 19:40:12 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Nov 11 19:40:12 2016 -0500
----------------------------------------------------------------------
.../core/utils/OrchestratorUtils.java | 231 +++++++++++++------
.../cpi/impl/SimpleOrchestratorImpl.java | 146 +++++++-----
2 files changed, 250 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 83c9273..61f7188 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -23,6 +23,7 @@ package org.apache.airavata.orchestrator.core.utils;
import java.io.IOException;
import java.util.*;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -31,11 +32,13 @@ import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterfa
import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
import org.apache.airavata.model.data.movement.DataMovementInterface;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
import org.apache.airavata.model.data.movement.SCPDataMovement;
import org.apache.airavata.model.data.movement.SecurityProtocol;
import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
@@ -50,69 +53,99 @@ import org.slf4j.LoggerFactory;
public class OrchestratorUtils {
private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class);
- public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException {
+ public static OrchestratorConfiguration loadOrchestratorConfiguration()
+ throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException {
+
OrchestratorConfiguration orchestratorConfiguration = new OrchestratorConfiguration();
- orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL)));
- orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE)));
- orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER)));
- orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE)));
- orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION)));
+ orchestratorConfiguration.setSubmitterInterval(
+ Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL)));
+ orchestratorConfiguration.setThreadPoolSize(
+ Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE)));
+ orchestratorConfiguration.setStartSubmitter(
+ Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER)));
+ orchestratorConfiguration.setEmbeddedMode(
+ Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE)));
+ orchestratorConfiguration.setEnableValidation(
+ Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION)));
if (orchestratorConfiguration.isEnableValidation()) {
- orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(",")));
+ orchestratorConfiguration.setValidatorClasses(
+ Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(",")));
}
return orchestratorConfiguration;
}
- public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+ public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context,
+ ProcessModel model,
+ String gatewayId) throws RegistryException {
try {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
String resourceHostId = model.getComputeResourceId();
- ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
- , resourceHostId);
- return preference.getPreferredJobSubmissionProtocol();
+ return getComputeResourcePreference(context, gatewayId, resourceHostId).getPreferredJobSubmissionProtocol();
} catch (AppCatalogException e) {
logger.error("Error occurred while initializing app catalog", e);
throw new RegistryException("Error occurred while initializing app catalog", e);
}
}
- public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException {
+ public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context,
+ String gatewayId,
+ String resourceHostId)
+ throws AppCatalogException, RegistryException {
+
+ GwyResourceProfile gatewayProfile = getGatewayProfile(context);
+ return gatewayProfile.getComputeResourcePreference(gatewayId
+ , resourceHostId);
+ }
+
+ public static GwyResourceProfile getGatewayProfile(OrchestratorContext context)
+ throws AppCatalogException, RegistryException {
+ return context.getRegistry().getAppCatalog().getGatewayProfile();
+ }
+
+ public static UsrResourceProfile getUserResourceProfile(OrchestratorContext context)
+ throws RegistryException, AppCatalogException {
+ return context.getRegistry().getAppCatalog().getUserResourceProfile();
+ }
+
+ public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model)
+ throws RegistryException {
try {
ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface();
- ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+ ApplicationInterfaceDescription appInterface =
+ applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
return appInterface.getApplicationName();
} catch (AppCatalogException e) {
throw new RegistryException("Error while retrieving application interface", e);
}
}
- public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+ public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context,
+ ProcessModel model,
+ String gatewayId) throws RegistryException {
try {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
String resourceHostId = model.getComputeResourceId();
- ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
- , resourceHostId);
- return preference.getPreferredDataMovementProtocol();
+ return getComputeResourcePreference(context, gatewayId, resourceHostId).getPreferredDataMovementProtocol();
} catch (AppCatalogException e) {
logger.error("Error occurred while initializing app catalog", e);
throw new RegistryException("Error occurred while initializing app catalog", e);
}
}
- public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException {
try {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
- String resourceHostId = processModel.getComputeResourceId();
- return gatewayProfile.getComputeResourcePreference(gatewayId, resourceHostId);
+ return getComputeResourcePreference(context, gatewayId, processModel.getComputeResourceId());
} catch (AppCatalogException e) {
logger.error("Error occurred while initializing app catalog", e);
throw new RegistryException("Error occurred while initializing app catalog", e);
}
}
- public static StoragePreference getStoragePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static StoragePreference getStoragePreference(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException {
try {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ GwyResourceProfile gatewayProfile = getGatewayProfile(context);
String resourceHostId = processModel.getComputeResourceId();
return gatewayProfile.getStoragePreference(gatewayId, resourceHostId);
} catch (AppCatalogException e) {
@@ -121,46 +154,103 @@ public class OrchestratorUtils {
}
}
- public static String getLoginUserName(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static String getLoginUserName(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException, AiravataException {
try {
- String loginUserName = null;
- String overrideLoginUserName = processModel.getProcessResourceSchedule().getOverrideLoginUserName();
- if (overrideLoginUserName != null && !overrideLoginUserName.equals("")) {
- loginUserName = overrideLoginUserName;
- } else {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
- loginUserName = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getLoginUserName();
+ ComputeResourcePreference computeResourcePreference = getComputeResourcePreference(context, gatewayId,
+ processModel.getComputeResourceId());
+ ComputationalResourceSchedulingModel processResourceSchedule = processModel.getProcessResourceSchedule();
+ if (processModel.isUseUserCRPref()) {
+ UsrResourceProfile userResourceProfile = getUserResourceProfile(context);
+ UserComputeResourcePreference userComputeResourcePreference = userResourceProfile
+ .getUserComputeResourcePreference(processModel.getUserName(), gatewayId,
+ processModel.getComputeResourceId());
+ if (isValid(userComputeResourcePreference.getLoginUserName())) {
+ return userComputeResourcePreference.getLoginUserName();
+ } else if (isValid(processResourceSchedule.getOverrideLoginUserName())) {
+ logger.warn("User computer resource preference doesn't have valid user login name, using computer " +
+ "resource scheduling login name " + processResourceSchedule.getOverrideLoginUserName());
+ return processResourceSchedule.getOverrideLoginUserName();
+ } else if (isValid(computeResourcePreference.getLoginUserName())) {
+ logger.warn("Either User computer resource preference or computer resource scheduling " +
+ "doesn't have valid user login name, using gateway computer resource preference login name "
+ + computeResourcePreference.getLoginUserName());
+ return computeResourcePreference.getLoginUserName();
+ }else {
+ throw new AiravataException("Login name is not found");
+ }
+ }else {
+ if (isValid(processResourceSchedule.getOverrideLoginUserName())) {
+ return processResourceSchedule.getOverrideLoginUserName();
+ } else if (isValid(computeResourcePreference.getLoginUserName())) {
+ logger.warn("Process compute resource scheduling doesn't have valid user login name, " +
+ "using gateway computer resource preference login name "
+ + computeResourcePreference.getLoginUserName());
+ return computeResourcePreference.getLoginUserName();
+ }else {
+ throw new AiravataException("Login name is not found");
+ }
}
- return loginUserName;
} catch (AppCatalogException e) {
logger.error("Error occurred while initializing app catalog to fetch login username", e);
throw new RegistryException("Error occurred while initializing app catalog to fetch login username", e);
}
}
- public static String getScratchLocation(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static String getScratchLocation(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException, AiravataException {
try {
- String scratchLocation = null;
- String overrideScratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
- if (overrideScratchLocation != null && !overrideScratchLocation.equals("")) {
- scratchLocation = overrideScratchLocation;
- } else {
- GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
- scratchLocation = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getScratchLocation();
+ ComputeResourcePreference computeResourcePreference = getComputeResourcePreference(context, gatewayId,
+ processModel.getComputeResourceId());
+ ComputationalResourceSchedulingModel processResourceSchedule = processModel.getProcessResourceSchedule();
+ if (processModel.isUseUserCRPref()) {
+ UsrResourceProfile userResourceProfile = getUserResourceProfile(context);
+ UserComputeResourcePreference userComputeResourcePreference = userResourceProfile
+ .getUserComputeResourcePreference(processModel.getUserName(), gatewayId,
+ processModel.getComputeResourceId());
+ if (isValid(userComputeResourcePreference.getScratchLocation())) {
+ return userComputeResourcePreference.getScratchLocation();
+ } else if (isValid(processResourceSchedule.getOverrideScratchLocation())) {
+ logger.warn("User computer resource preference doesn't have valid scratch location, using computer " +
+ "resource scheduling scratch location " + processResourceSchedule.getOverrideScratchLocation());
+ return processResourceSchedule.getOverrideScratchLocation();
+ } else if (isValid(computeResourcePreference.getScratchLocation())) {
+ logger.warn("Either User computer resource preference or computer resource scheduling doesn't have " +
+ "valid scratch location, using gateway computer resource preference scratch location"
+ + computeResourcePreference.getScratchLocation());
+ return computeResourcePreference.getScratchLocation();
+ }else {
+ throw new AiravataException("Scratch location is not found");
+ }
+ }else {
+ if (isValid(processResourceSchedule.getOverrideScratchLocation())) {
+ return processResourceSchedule.getOverrideScratchLocation();
+ } else if (isValid(computeResourcePreference.getScratchLocation())) {
+ logger.warn("Process compute resource scheduling doesn't have valid scratch location, " +
+ "using gateway computer resource preference scratch location"
+ + computeResourcePreference.getScratchLocation());
+ return computeResourcePreference.getScratchLocation();
+ }else {
+ throw new AiravataException("Scratch location is not found");
+ }
}
- return scratchLocation;
} catch (AppCatalogException e) {
logger.error("Error occurred while initializing app catalog to fetch scratch location", e);
throw new RegistryException("Error occurred while initializing app catalog to fetch scratch location", e);
}
}
- public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException {
try {
String resourceHostId = processModel.getComputeResourceId();
ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
JobSubmissionProtocol preferredJobSubmissionProtocol = resourcePreference.getPreferredJobSubmissionProtocol();
- ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ ComputeResourceDescription resourceDescription =
+ context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
List<JobSubmissionInterface> interfaces = new ArrayList<>();
@@ -178,21 +268,14 @@ public class OrchestratorUtils {
}
}
}else {
- Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
+ Collections.sort(jobSubmissionInterfaces,
+ (jobSubmissionInterface, jobSubmissionInterface2) ->
+ jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder());
}
}
interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
- Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
+ Collections.sort(interfaces, (jobSubmissionInterface, jobSubmissionInterface2) ->
+ jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder());
} else {
throw new RegistryException("Compute resource should have at least one job submission interface defined...");
}
@@ -202,12 +285,15 @@ public class OrchestratorUtils {
}
}
- public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException {
try {
String resourceHostId = processModel.getComputeResourceId();
ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol();
- ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ ComputeResourceDescription resourceDescription =
+ context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces();
if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){
@@ -226,7 +312,9 @@ public class OrchestratorUtils {
return null;
}
- public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+ public static int getDataMovementPort(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException{
try {
DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId);
DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId);
@@ -243,7 +331,9 @@ public class OrchestratorUtils {
}
- public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+ public static SecurityProtocol getSecurityProtocol(OrchestratorContext context,
+ ProcessModel processModel,
+ String gatewayId) throws RegistryException{
try {
JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(context, processModel, gatewayId);
@@ -274,7 +364,8 @@ public class OrchestratorUtils {
return null;
}
- public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context,
+ String submissionId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
@@ -285,7 +376,8 @@ public class OrchestratorUtils {
}
}
- public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context,
+ String submissionId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
@@ -296,7 +388,8 @@ public class OrchestratorUtils {
}
}
- public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context,
+ String submissionId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
@@ -307,7 +400,8 @@ public class OrchestratorUtils {
}
}
- public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context,
+ String submissionId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
@@ -318,7 +412,8 @@ public class OrchestratorUtils {
}
}
- public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
+ public static SCPDataMovement getSCPDataMovement(OrchestratorContext context,
+ String dataMoveId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
@@ -328,4 +423,8 @@ public class OrchestratorUtils {
throw new RegistryException(errorMsg, e);
}
}
+
+ private static boolean isValid(String str) {
+ return (str != null && str.trim().isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b97e79a..19a3521 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.orchestrator.cpi.impl;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.gfac.core.task.TaskException;
@@ -93,15 +94,18 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
- public ValidationResults validateExperiment(ExperimentModel experiment) throws OrchestratorException,LaunchValidationException {
- org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
+ public ValidationResults validateExperiment(ExperimentModel experiment)
+ throws OrchestratorException,LaunchValidationException {
+ org.apache.airavata.model.error.ValidationResults validationResults =
+ new org.apache.airavata.model.error.ValidationResults();
validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
if (this.orchestratorConfiguration.isEnableValidation()) {
List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
for (String validator : validatorClasses) {
try {
- Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+ Class<? extends JobMetadataValidator> vClass =
+ Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
JobMetadataValidator jobMetadataValidator = vClass.newInstance();
validationResults = jobMetadataValidator.validate(experiment, null);
if (validationResults.isValidationState()) {
@@ -116,14 +120,15 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
}
- logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+ logger.error("Validation of " + validator + " for experiment Id " +
+ experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
validationResults.setValidationState(false);
try {
ErrorModel details = new ErrorModel();
details.setActualErrorMessage(errorMsg);
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
- orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, details,
- experiment.getExperimentId());
+ orchestratorContext.getRegistry().getExperimentCatalog()
+ .add(ExpCatChildDataType.EXPERIMENT_ERROR, details, experiment.getExperimentId());
} catch (RegistryException e) {
logger.error("Error while saving error details to registry", e);
}
@@ -147,12 +152,15 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
//atleast one validation has failed, so we throw an exception
LaunchValidationException launchValidationException = new LaunchValidationException();
launchValidationException.setValidationResult(validationResults);
- launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+ launchValidationException.setErrorMessage("Validation failed refer the validationResults list for " +
+ "detail error. Validation errors : " + errorMsg);
throw launchValidationException;
}
}
- public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) throws OrchestratorException,LaunchValidationException {
+ public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel)
+ throws OrchestratorException, LaunchValidationException {
+
org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
@@ -167,46 +175,42 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
logger.info("Validation of " + validator + " is SUCCESSFUL");
} else {
List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
- for (ValidatorResult result : validationResultList){
- if (!result.isResult()){
+ for (ValidatorResult result : validationResultList) {
+ if (!result.isResult()) {
String validationError = result.getErrorDetails();
- if (validationError != null){
+ if (validationError != null) {
errorMsg += validationError + " ";
}
}
}
- logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+ logger.error("Validation of " + validator + " for experiment Id " +
+ experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
validationResults.setValidationState(false);
try {
ErrorModel details = new ErrorModel();
details.setActualErrorMessage(errorMsg);
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
- orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, details,
- processModel.getProcessId());
+ orchestratorContext.getRegistry().getExperimentCatalog()
+ .add(ExpCatChildDataType.PROCESS_ERROR, details, processModel.getProcessId());
} catch (RegistryException e) {
logger.error("Error while saving error details to registry", e);
}
break;
}
- } catch (ClassNotFoundException e) {
- logger.error("Error loading the validation class: ", validator, e);
- validationResults.setValidationState(false);
- } catch (InstantiationException e) {
- logger.error("Error loading the validation class: ", validator, e);
- validationResults.setValidationState(false);
- } catch (IllegalAccessException e) {
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
logger.error("Error loading the validation class: ", validator, e);
validationResults.setValidationState(false);
}
}
}
- if(validationResults.isValidationState()){
+ if (validationResults.isValidationState()) {
return validationResults;
- }else {
+ } else {
//atleast one validation has failed, so we throw an exception
LaunchValidationException launchValidationException = new LaunchValidationException();
launchValidationException.setValidationResult(validationResults);
- launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+ launchValidationException.setErrorMessage("Validation failed refer the validationResults " +
+ "list for detail error. Validation errors : " + errorMsg);
throw launchValidationException;
}
}
@@ -253,7 +257,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
try {
Registry registry = orchestratorContext.getRegistry();
ExperimentModel experimentModel = (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
- List<Object> processList = registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
+ List<Object> processList = registry.getExperimentCatalog()
+ .get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
if (processList != null && !processList.isEmpty()) {
for (Object processObject : processList) {
ProcessModel processModel = (ProcessModel)processObject;
@@ -283,8 +288,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
throw new OrchestratorException("Compute Resource Id cannot be null at this point");
}
ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
- JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
- ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ JobSubmissionInterface preferredJobSubmissionInterface =
+ OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
+ ComputeResourcePreference resourcePreference =
+ OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
List<String> taskIdList = new ArrayList<>();
if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) {
@@ -303,19 +310,22 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
// need to create more job submissions
int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
+ taskIdList.addAll(
+ createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, maxRunTime));
}
int leftWallTime = userGivenWallTime % maxRunTime;
if (leftWallTime != 0) {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+ taskIdList.addAll(
+ createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, leftWallTime));
}
} else {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ taskIdList.addAll(
+ createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
}
}
} else {
- taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
}
@@ -342,7 +352,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
private List<String> createAndSaveEnvSetupTask(String gatewayId,
ProcessModel processModel,
ExperimentCatalog experimentCatalog)
- throws RegistryException, TException {
+ throws RegistryException, TException, AiravataException {
List<String> envTaskIds = new ArrayList<>();
TaskModel envSetupTask = new TaskModel();
envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
@@ -363,7 +373,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return envTaskIds;
}
- public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+ public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId)
+ throws RegistryException, AiravataException {
+
List<String> dataStagingTaskIds = new ArrayList<>();
List<InputDataObjectType> processInputs = processModel.getProcessInputs();
@@ -380,8 +392,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
case URI_COLLECTION:
try {
TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId);
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask,
- processModel.getProcessId());
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog()
+ .add(ExpCatChildDataType.TASK, inputDataStagingTask, processModel.getProcessId());
inputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
} catch (TException | AppCatalogException | TaskException e) {
@@ -397,7 +409,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return dataStagingTaskIds;
}
- public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+ public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId)
+ throws RegistryException, AiravataException {
+
List<String> dataStagingTaskIds = new ArrayList<>();
List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel);
@@ -405,14 +419,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
for (OutputDataObjectType processOutput : processOutputs) {
DataType type = processOutput.getType();
switch (type) {
- case STDOUT :
- if(null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()){
+ case STDOUT:
+ if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stdout");
}
createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
case STDERR:
- if(null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()){
+ if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stderr");
}
createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
@@ -439,29 +453,35 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
private boolean isArchive(ProcessModel processModel, String gatewayId) throws AppCatalogException {
AppCatalog appCatalog = RegistryFactory.getAppCatalog();
- ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId());
+ ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface()
+ .getApplicationInterface(processModel.getApplicationInterfaceId());
return appInterface.isArchiveWorkingDirectory();
}
- private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+ private void createArchiveDataStatgingTask(ProcessModel processModel,
+ String gatewayId,
+ List<String> dataStagingTaskIds) throws RegistryException, AiravataException {
TaskModel archiveTask = null;
try {
archiveTask = getOutputDataStagingTask(processModel, null, gatewayId);
} catch (TException e) {
throw new RegistryException("Error! DataStaging sub task serialization failed");
}
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask,
- processModel.getProcessId());
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog()
+ .add(ExpCatChildDataType.TASK, archiveTask, processModel.getProcessId());
archiveTask.setTaskId(taskId);
dataStagingTaskIds.add(archiveTask.getTaskId());
}
- private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
+ private void createOutputDataSatagingTasks(ProcessModel processModel,
+ String gatewayId,
+ List<String> dataStagingTaskIds,
+ OutputDataObjectType processOutput) throws RegistryException, AiravataException {
try {
TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
- processModel.getProcessId());
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog()
+ .add(ExpCatChildDataType.TASK, outputDataStagingTask, processModel.getProcessId());
outputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
} catch (TException e) {
@@ -469,7 +489,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
- private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime)
+ private List<String> createAndSaveSubmissionTasks(String gatewayId,
+ JobSubmissionInterface jobSubmissionInterface,
+ ProcessModel processModel,
+ int wallTime)
throws TException, RegistryException, OrchestratorException {
JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
@@ -539,7 +562,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
});
}
- private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException {
+ private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException, AiravataException {
// create new task model for this task
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
@@ -551,19 +574,20 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskModel.setTaskType(TaskTypes.DATA_STAGING);
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
- ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
- ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
- String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
- remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog()
+ .getComputeResource().getComputeResource(processModel.getComputeResourceId());
+ String workingDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) +
+ File.separator + processModel.getProcessId() + File.separator;
URI destination = null;
try {
- DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+ DataMovementProtocol dataMovementProtocol =
+ OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId);
destination = new URI(dataMovementProtocol.name(),
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
- remoteOutputDir , null, null);
+ workingDir , null, null);
} catch (URISyntaxException e) {
throw new TaskException("Error while constructing destination file URI");
}
@@ -575,7 +599,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return taskModel;
}
- private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+ private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException, AiravataException {
try {
// create new task model for this task
@@ -587,11 +611,11 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatuses(Arrays.asList(taskStatus));
taskModel.setTaskType(TaskTypes.DATA_STAGING);
- ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
- ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog()
+ .getComputeResource().getComputeResource(processModel.getComputeResourceId());
- String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
- remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ String workingDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId)
+ + File.separator + processModel.getProcessId() + File.separator;
DataStagingTaskModel submodel = new DataStagingTaskModel();
DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
URI source = null;
@@ -604,7 +628,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
- remoteOutputDir + processOutput.getValue(), null, null);
+ workingDir + processOutput.getValue(), null, null);
} else {
// archive
submodel.setType(DataStageType.ARCHIVE_OUTPUT);
@@ -612,7 +636,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
- remoteOutputDir, null, null);
+ workingDir, null, null);
}
} catch (URISyntaxException e) {
throw new TaskException("Error while constructing source file URI");