You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/01/01 23:49:20 UTC
[airavata] branch AIRAVATA-3549 updated: Making task context load on demand
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch AIRAVATA-3549
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/AIRAVATA-3549 by this push:
new 41b67ac Making task context load on demand
41b67ac is described below
commit 41b67acb2ae05bf29d5d625caa515d2efb153674
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Jan 1 18:49:11 2022 -0500
Making task context load on demand
---
.../airavata/helix/impl/task/AiravataTask.java | 6 +-
.../airavata/helix/impl/task/TaskContext.java | 341 +++++++++++----------
.../helix/impl/task/staging/DataStagingTask.java | 14 +-
.../task/submission/DefaultJobSubmissionTask.java | 6 +-
.../task/submission/config/GroovyMapBuilder.java | 5 +-
5 files changed, 195 insertions(+), 177 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index c6cf095..eca6aff 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -279,7 +279,7 @@ public abstract class AiravataTask extends AbstractTask {
}
}
- } catch (TException e) {
+ } catch (Exception e) {
String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment outputs";
throw new TaskOnFailException(msg, true, e);
}
@@ -303,13 +303,13 @@ public abstract class AiravataTask extends AbstractTask {
}
}
- } catch (TException e) {
+ } catch (Exception e) {
String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment outputs";
throw new TaskOnFailException(msg, true, e);
}
}
- private String saveDataProduct(String outputName, String outputVal, String outputMetadata) throws TException {
+ private String saveDataProduct(String outputName, String outputVal, String outputMetadata) throws Exception {
DataProductModel dataProductModel = new DataProductModel();
dataProductModel.setGatewayId(getGatewayId());
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 7e32c00..1533613 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -21,6 +21,7 @@ package org.apache.airavata.helix.impl.task;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -174,7 +175,7 @@ public class TaskContext {
this.processModel = processModel;
}
- public String getWorkingDir() {
+ public String getWorkingDir() throws Exception {
if (workingDir == null) {
if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
@@ -187,17 +188,17 @@ public class TaskContext {
return workingDir;
}
- public String getScratchLocation() {
+ public String getScratchLocation() throws Exception {
if (scratchLocation == null) {
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getScratchLocation())) {
- scratchLocation = userComputeResourcePreference.getScratchLocation();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getScratchLocation())) {
+ scratchLocation = getUserComputeResourcePreference().getScratchLocation();
} else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
- } else if (isSetGroupResourceProfile() && groupComputeResourcePreference != null &&
- isValid(groupComputeResourcePreference.getScratchLocation())) {
- scratchLocation = groupComputeResourcePreference.getScratchLocation();
+ } else if (isSetGroupResourceProfile() && getGroupComputeResourcePreference() != null &&
+ isValid(getGroupComputeResourcePreference().getScratchLocation())) {
+ scratchLocation = getGroupComputeResourcePreference().getScratchLocation();
} else {
throw new RuntimeException("Can't find a specified scratch location for compute resource " + getComputeResourceId());
}
@@ -210,7 +211,15 @@ public class TaskContext {
this.workingDir = workingDir;
}
- public GatewayResourceProfile getGatewayResourceProfile() {
+ public GatewayResourceProfile getGatewayResourceProfile() throws Exception {
+ if (this.groupResourceProfile == null) {
+ try {
+ gatewayResourceProfile = registryClient.getGatewayResourceProfile(gatewayId);
+ } catch (TException e) {
+ logger.error("Failed to fetch gateway resource profile for gateway {}", gatewayId);
+ throw e;
+ }
+ }
return gatewayResourceProfile;
}
@@ -218,7 +227,15 @@ public class TaskContext {
this.gatewayResourceProfile = gatewayResourceProfile;
}
- public GroupResourceProfile getGroupResourceProfile() {
+ public GroupResourceProfile getGroupResourceProfile() throws Exception {
+ if (groupResourceProfile == null) {
+ try {
+ groupResourceProfile = registryClient.getGroupResourceProfile(processModel.getGroupResourceProfileId());
+ } catch (TException e) {
+ logger.error("Failed to find a group resource proifle with id {}", processModel.getGroupResourceProfileId());
+ throw e;
+ }
+ }
return groupResourceProfile;
}
@@ -226,7 +243,19 @@ public class TaskContext {
this.groupResourceProfile = groupResourceProfile;
}
- public GroupComputeResourcePreference getGroupComputeResourcePreference() {
+ public GroupComputeResourcePreference getGroupComputeResourcePreference() throws Exception {
+
+ if (groupComputeResourcePreference == null) {
+ try {
+ groupComputeResourcePreference = registryClient.getGroupComputeResourcePreference(
+ processModel.getComputeResourceId(),
+ processModel.getGroupResourceProfileId());
+ } catch (TException e) {
+ logger.error("Failed to find group compute resource preference for compute {}, and group resource profile {}",
+ processModel.getComputeResourceId(), processModel.getGroupResourceProfileId());
+ throw e;
+ }
+ }
return groupComputeResourcePreference;
}
@@ -234,7 +263,17 @@ public class TaskContext {
this.groupComputeResourcePreference = groupComputeResourcePreference;
}
- public UserResourceProfile getUserResourceProfile() {
+ public UserResourceProfile getUserResourceProfile() throws Exception {
+
+ if (userResourceProfile == null && processModel.isUseUserCRPref()) {
+ try {
+ this.userResourceProfile = registryClient.getUserResourceProfile(processModel.getUserName(), gatewayId);
+ } catch (TException e) {
+ logger.error("Failed to fetch user resource profile for user {} in gateway {}",
+ processModel.getUserName(), gatewayId, e);
+ throw e;
+ }
+ }
return userResourceProfile;
}
@@ -242,8 +281,20 @@ public class TaskContext {
this.userResourceProfile = userResourceProfile;
}
- private UserComputeResourcePreference getUserComputeResourcePreference() {
- return userComputeResourcePreference;
+ private UserComputeResourcePreference getUserComputeResourcePreference() throws Exception {
+ if (this.userComputeResourcePreference == null && processModel.isUseUserCRPref()) {
+ try {
+ this.userComputeResourcePreference = registryClient.getUserComputeResourcePreference(
+ processModel.getUserName(),
+ gatewayId,
+ processModel.getComputeResourceId());
+ } catch (TException e) {
+ logger.error("Failed to fetch user compute resource preference for user {} compute resource {} in gateway {}",
+ processModel.getUserName(), processModel.getComputeResourceId(), gatewayId, e);
+ throw e;
+ }
+ }
+ return this.userComputeResourcePreference;
}
public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
@@ -258,7 +309,19 @@ public class TaskContext {
this.userStoragePreference = userStoragePreference;
}
- public StoragePreference getGatewayStorageResourcePreference() {
+ public StoragePreference getGatewayStorageResourcePreference() throws Exception {
+ if (this.gatewayStorageResourcePreference == null) {
+ try {
+ this.gatewayStorageResourcePreference = registryClient.getGatewayStoragePreference(
+ gatewayId,
+ processModel.getStorageResourceId());
+ } catch (TException e) {
+ logger.error("Failed to fetch gateway storage preference for gateway {} and storage {}",
+ gatewayId,
+ processModel.getStorageResourceId(), e);
+ throw e;
+ }
+ }
return gatewayStorageResourcePreference;
}
@@ -266,7 +329,10 @@ public class TaskContext {
this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
}
- public ComputeResourceDescription getComputeResourceDescription() {
+ public ComputeResourceDescription getComputeResourceDescription() throws Exception {
+ if (this.computeResourceDescription == null) {
+ this.computeResourceDescription = registryClient.getComputeResource(getComputeResourceId());
+ }
return computeResourceDescription;
}
@@ -274,7 +340,17 @@ public class TaskContext {
this.computeResourceDescription = computeResourceDescription;
}
- public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+ public ApplicationDeploymentDescription getApplicationDeploymentDescription() throws Exception {
+ if (this.applicationDeploymentDescription == null) {
+ try {
+ this.applicationDeploymentDescription = registryClient.getApplicationDeployment(
+ processModel.getApplicationDeploymentId());
+ } catch (TException e) {
+ logger.error("Failed to fetch application deployment with id {}",
+ processModel.getApplicationDeploymentId(), e);
+ throw e;
+ }
+ }
return applicationDeploymentDescription;
}
@@ -283,7 +359,17 @@ public class TaskContext {
this.applicationDeploymentDescription = applicationDeploymentDescription;
}
- public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+ public ApplicationInterfaceDescription getApplicationInterfaceDescription() throws Exception {
+ if (this.applicationInterfaceDescription == null) {
+ try {
+ this.applicationInterfaceDescription = registryClient.getApplicationInterface(
+ processModel.getApplicationInterfaceId());
+ } catch (TException e) {
+ logger.error("Failed to fetch application interface with id {}",
+ processModel.getApplicationInterfaceId(), e);
+ throw e;
+ }
+ }
return applicationInterfaceDescription;
}
@@ -311,14 +397,14 @@ public class TaskContext {
this.outputDir = outputDir;
}
- public String getOutputDir() {
+ public String getOutputDir() throws Exception {
if (outputDir == null) {
outputDir = getWorkingDir();
}
return outputDir;
}
- public String getInputDir() {
+ public String getInputDir() throws Exception {
if (inputDir == null) {
inputDir = getWorkingDir();
}
@@ -329,23 +415,25 @@ public class TaskContext {
this.inputDir = inputDir;
}
- public JobSubmissionProtocol getJobSubmissionProtocol() {
+ public JobSubmissionProtocol getJobSubmissionProtocol() throws Exception {
if (jobSubmissionProtocol == null) {
// Take highest priority one
- List<JobSubmissionInterface> jobSubmissionInterfaces = computeResourceDescription.getJobSubmissionInterfaces();
+ List<JobSubmissionInterface> jobSubmissionInterfaces = getComputeResourceDescription()
+ .getJobSubmissionInterfaces();
Collections.sort(jobSubmissionInterfaces, Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
jobSubmissionProtocol = jobSubmissionInterfaces.get(0).getJobSubmissionProtocol();
}
return jobSubmissionProtocol;
}
+
public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
this.jobSubmissionProtocol = jobSubmissionProtocol;
}
- public DataMovementProtocol getDataMovementProtocol() {
+ public DataMovementProtocol getDataMovementProtocol() throws Exception {
if (dataMovementProtocol == null) {
// Take highest priority one
- List<DataMovementInterface> dataMovementInterfaces = computeResourceDescription.getDataMovementInterfaces();
+ List<DataMovementInterface> dataMovementInterfaces = getComputeResourceDescription().getDataMovementInterfaces();
Collections.sort(dataMovementInterfaces, Comparator.comparingInt(DataMovementInterface::getPriorityOrder));
dataMovementProtocol = dataMovementInterfaces.get(0).getDataMovementProtocol();
}
@@ -394,7 +482,7 @@ public class TaskContext {
return taskMap;
}
- public JobModel getJobModel() {
+ public JobModel getJobModel() throws Exception {
if (jobModel == null) {
jobModel = new JobModel();
jobModel.setProcessId(processId);
@@ -447,46 +535,46 @@ public class TaskContext {
return null;
}
- public String getComputeResourceId() {
+ public String getComputeResourceId() throws Exception {
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getComputeResourceId())) {
- return userComputeResourcePreference.getComputeResourceId();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getComputeResourceId())) {
+ return getUserComputeResourcePreference().getComputeResourceId();
} else {
- return groupComputeResourcePreference.getComputeResourceId();
+ return getGroupComputeResourcePreference().getComputeResourceId();
}
}
- public String getComputeResourceCredentialToken(){
+ public String getComputeResourceCredentialToken() throws Exception {
if (isUseUserCRPref()) {
- if (userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ if (getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getResourceSpecificCredentialStoreToken())) {
+ return getUserComputeResourcePreference().getResourceSpecificCredentialStoreToken();
} else {
- return userResourceProfile.getCredentialStoreToken();
+ return getUserResourceProfile().getCredentialStoreToken();
}
} else if (isSetGroupResourceProfile() &&
- groupComputeResourcePreference != null &&
- isValid(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ getGroupComputeResourcePreference() != null &&
+ isValid(getGroupComputeResourcePreference().getResourceSpecificCredentialStoreToken())) {
+ return getGroupComputeResourcePreference().getResourceSpecificCredentialStoreToken();
} else {
- return groupResourceProfile.getDefaultCredentialStoreToken();
+ return getGroupResourceProfile().getDefaultCredentialStoreToken();
}
}
- public String getStorageResourceCredentialToken(){
- if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
+ public String getStorageResourceCredentialToken() throws Exception {
+ if (isValid(getGatewayStorageResourcePreference().getResourceSpecificCredentialStoreToken())) {
+ return getGatewayStorageResourcePreference().getResourceSpecificCredentialStoreToken();
} else {
return gatewayResourceProfile.getCredentialStoreToken();
}
}
- public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
+ public JobSubmissionProtocol getPreferredJobSubmissionProtocol() throws Exception {
return getJobSubmissionProtocol();
}
- public DataMovementProtocol getPreferredDataMovementProtocol() {
+ public DataMovementProtocol getPreferredDataMovementProtocol() throws Exception {
return getDataMovementProtocol();
}
@@ -536,8 +624,11 @@ public class TaskContext {
return processModel.getExperimentId();
}
- public StorageResourceDescription getStorageResourceDescription() {
- return storageResourceDescription;
+ public StorageResourceDescription getStorageResourceDescription() throws Exception {
+ if (this.storageResourceDescription == null) {
+ this.storageResourceDescription = registryClient.getStorageResource(getStorageResourceId());
+ }
+ return this.storageResourceDescription;
}
public void setStorageResourceDescription(StorageResourceDescription storageResourceDescription) {
@@ -552,31 +643,31 @@ public class TaskContext {
return getProcessModel().isSetGroupResourceProfileId();
}
- public String getComputeResourceLoginUserName() {
+ public String getComputeResourceLoginUserName() throws Exception {
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getLoginUserName())) {
- return userComputeResourcePreference.getLoginUserName();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getLoginUserName())) {
+ return getUserComputeResourcePreference().getLoginUserName();
} else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
} else if (isSetGroupResourceProfile() &&
- groupComputeResourcePreference != null &&
- isValid(groupComputeResourcePreference.getLoginUserName())){
- return groupComputeResourcePreference.getLoginUserName();
+ getGroupComputeResourcePreference() != null &&
+ isValid(getGroupComputeResourcePreference().getLoginUserName())){
+ return getGroupComputeResourcePreference().getLoginUserName();
}
throw new RuntimeException("Can't find login username for compute resource");
}
- public String getStorageResourceLoginUserName(){
- return gatewayStorageResourcePreference.getLoginUserName();
+ public String getStorageResourceLoginUserName() throws Exception {
+ return getGatewayStorageResourcePreference().getLoginUserName();
}
- public String getStorageFileSystemRootLocation(){
- return gatewayStorageResourcePreference.getFileSystemRootLocation();
+ public String getStorageFileSystemRootLocation() throws Exception {
+ return getGatewayStorageResourcePreference().getFileSystemRootLocation();
}
- public String getStorageResourceId() {
- return gatewayStorageResourcePreference.getStorageResourceId();
+ public String getStorageResourceId() throws Exception {
+ return getGatewayStorageResourcePreference().getStorageResourceId();
}
private ComputationalResourceSchedulingModel getProcessCRSchedule() {
@@ -622,31 +713,31 @@ public class TaskContext {
return str != null && !str.trim().isEmpty();
}
- public String getAllocationProjectNumber() {
+ public String getAllocationProjectNumber() throws Exception {
if (isValid(processModel.getProcessResourceSchedule().getOverrideAllocationProjectNumber())) {
return processModel.getProcessResourceSchedule().getOverrideAllocationProjectNumber();
} else if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- userComputeResourcePreference.getAllocationProjectNumber() != null) {
- return userComputeResourcePreference.getAllocationProjectNumber();
+ getUserComputeResourcePreference() != null &&
+ getUserComputeResourcePreference().getAllocationProjectNumber() != null) {
+ return getUserComputeResourcePreference().getAllocationProjectNumber();
} else if (isSetGroupResourceProfile() &&
- groupComputeResourcePreference != null &&
- isValid(groupComputeResourcePreference.getAllocationProjectNumber())){
- return groupComputeResourcePreference.getAllocationProjectNumber();
+ getGroupComputeResourcePreference() != null &&
+ isValid(getGroupComputeResourcePreference().getAllocationProjectNumber())){
+ return getGroupComputeResourcePreference().getAllocationProjectNumber();
} else {
return null;
}
}
- public String getReservation() {
+ public String getReservation() throws Exception {
long start = 0, end = 0;
String reservation = null;
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getReservation())) {
- reservation = userComputeResourcePreference.getReservation();
- start = userComputeResourcePreference.getReservationStartTime();
- end = userComputeResourcePreference.getReservationEndTime();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getReservation())) {
+ reservation = getUserComputeResourcePreference().getReservation();
+ start = getUserComputeResourcePreference().getReservationStartTime();
+ end = getUserComputeResourcePreference().getReservationEndTime();
}
if (reservation != null && start > 0 && start < end) {
long now = Calendar.getInstance().getTimeInMillis();
@@ -656,42 +747,42 @@ public class TaskContext {
}
String queueName = getQueueName();
ComputeResourceReservation computeResourceReservation = GroupComputeResourcePreferenceUtil
- .getActiveReservationForQueue(groupComputeResourcePreference, queueName);
+ .getActiveReservationForQueue(getGroupComputeResourcePreference(), queueName);
if (computeResourceReservation != null) {
return computeResourceReservation.getReservationName();
}
return null;
}
- public String getQualityOfService() {
+ public String getQualityOfService() throws Exception {
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getQualityOfService())) {
- return userComputeResourcePreference.getQualityOfService();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getQualityOfService())) {
+ return getUserComputeResourcePreference().getQualityOfService();
} else {
- return groupComputeResourcePreference.getQualityOfService();
+ return getGroupComputeResourcePreference().getQualityOfService();
}
}
- public String getQueueName() {
+ public String getQueueName() throws Exception {
if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
- return userComputeResourcePreference.getPreferredBatchQueue();
+ getUserComputeResourcePreference() != null &&
+ isValid(getUserComputeResourcePreference().getPreferredBatchQueue())) {
+ return getUserComputeResourcePreference().getPreferredBatchQueue();
} else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
return processModel.getProcessResourceSchedule().getQueueName();
} else {
- Optional<BatchQueue> defaultQueue = computeResourceDescription.getBatchQueues().stream().filter(q -> q.isIsDefaultQueue()).findFirst();
+ Optional<BatchQueue> defaultQueue = getComputeResourceDescription().getBatchQueues().stream().filter(q -> q.isIsDefaultQueue()).findFirst();
if (defaultQueue.isPresent()) {
return defaultQueue.get().getQueueName();
} else {
- throw new RuntimeException("Can't find default queue for resource " + computeResourceDescription.getComputeResourceId());
+ throw new RuntimeException("Can't find default queue for resource " + getComputeResourceDescription().getComputeResourceId());
}
}
}
- public List<String> getQueueSpecificMacros() {
+ public List<String> getQueueSpecificMacros() throws Exception {
String queueName = getProcessCRSchedule().getQueueName();
Optional<BatchQueue> queue = getComputeResourceDescription().getBatchQueues().stream()
.filter(x->x.getQueueName().equals(queueName)).findFirst();
@@ -703,7 +794,7 @@ public class TaskContext {
return null;
}
- public JobSubmissionInterface getPreferredJobSubmissionInterface() throws TaskOnFailException {
+ public JobSubmissionInterface getPreferredJobSubmissionInterface() throws Exception {
JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol();
ComputeResourceDescription resourceDescription = getComputeResourceDescription();
List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
@@ -792,84 +883,6 @@ public class TaskContext {
ctx.setRegistryClient(registryClient);
ctx.setProcessModel(processModel);
ctx.setProfileClient(profileClient);
-
- ctx.setGroupComputeResourcePreference(registryClient.getGroupComputeResourcePreference(processModel.getComputeResourceId(),
- processModel.getGroupResourceProfileId()));
-
- ctx.setGroupResourceProfile(registryClient.getGroupResourceProfile(processModel.getGroupResourceProfileId()));
-
- ctx.setGatewayResourceProfile(
- Optional.ofNullable(registryClient.getGatewayResourceProfile(gatewayId))
- .orElseThrow(() -> new Exception("Invalid GatewayResourceProfile")));
-
- logger.debug("Using storage resource preference for storage " + processModel.getStorageResourceId());
- ctx.setGatewayStorageResourcePreference(
- Optional.ofNullable(registryClient.getGatewayStoragePreference(
- gatewayId,
- processModel.getStorageResourceId()))
- .orElseThrow(() -> new Exception("Invalid Gateway StoragePreference")));
-
- logger.debug("Using application deployment " + processModel.getApplicationDeploymentId());
- ctx.setApplicationDeploymentDescription(
- Optional.ofNullable(registryClient.getApplicationDeployment(
- processModel.getApplicationDeploymentId()))
- .orElseThrow(() -> new Exception("Invalid Application Deployment")));
-
- logger.debug("Using application interface " + processModel.getApplicationInterfaceId());
- ctx.setApplicationInterfaceDescription(
- Optional.ofNullable(registryClient.getApplicationInterface(
- processModel.getApplicationInterfaceId()))
- .orElseThrow(() -> new Exception("Invalid Application Interface")));
-
- logger.debug("Using compute resource " + ctx.getComputeResourceId());
- ctx.setComputeResourceDescription(
- Optional.ofNullable(registryClient.getComputeResource(
- ctx.getComputeResourceId()))
- .orElseThrow(() -> new Exception("Invalid Compute Resource Description")));
-
- logger.debug("Using storage resource " + ctx.getStorageResourceId());
- ctx.setStorageResourceDescription(
- Optional.ofNullable(registryClient.getStorageResource(
- ctx.getStorageResourceId()))
- .orElseThrow(() -> new Exception("Invalid Storage Resource Description")));
-
- if (processModel.isUseUserCRPref()) {
- ctx.setUserResourceProfile(registryClient.getUserResourceProfile(processModel.getUserName(), gatewayId));
- ctx.setUserComputeResourcePreference(registryClient.getUserComputeResourcePreference(
- processModel.getUserName(),
- gatewayId,
- processModel.getComputeResourceId()));
- }
-
- List<OutputDataObjectType> applicationOutputs = ctx.getApplicationInterfaceDescription().getApplicationOutputs();
- if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
- for (OutputDataObjectType outputDataObjectType : applicationOutputs) {
- if (outputDataObjectType.getType().equals(DataType.STDOUT)) {
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
- String stdOut = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
- + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stdout";
- outputDataObjectType.setValue(stdOut);
- ctx.setStdoutLocation(stdOut);
- } else {
- ctx.setStdoutLocation(outputDataObjectType.getValue());
- }
- }
- if (outputDataObjectType.getType().equals(DataType.STDERR)) {
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
- String stderrLocation = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
- + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stderr";
- outputDataObjectType.setValue(stderrLocation);
- ctx.setStderrLocation(stderrLocation);
- } else {
- ctx.setStderrLocation(outputDataObjectType.getValue());
- }
- }
- }
- }
-
- // TODO move this to some where else as this is not the correct place to do so
- registryClient.updateProcess(processModel, processId);
- processModel.setProcessOutputs(applicationOutputs);
return ctx;
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index 1df76a2..edc4f1c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -69,7 +69,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
@SuppressWarnings("WeakerAccess")
- protected StorageResourceDescription getStorageResource() throws TaskOnFailException {
+ protected StorageResourceDescription getStorageResource() throws Exception {
StorageResourceDescription storageResource = getTaskContext().getStorageResourceDescription();
if (storageResource == null) {
throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), false, null);
@@ -79,7 +79,9 @@ public abstract class DataStagingTask extends AiravataTask {
@SuppressWarnings("WeakerAccess")
protected StorageResourceAdaptor getStorageAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException {
+ String storageId = null;
try {
+ storageId = getTaskContext().getStorageResourceId();
StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor(
getGatewayId(),
getTaskContext().getStorageResourceId(),
@@ -91,23 +93,25 @@ public abstract class DataStagingTask extends AiravataTask {
throw new TaskOnFailException("Storage resource adaptor for " + getTaskContext().getStorageResourceId() + " can not be null", true, null);
}
return storageResourceAdaptor;
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed to obtain adaptor for storage resource " + getTaskContext().getStorageResourceId() +
+ } catch (Exception e) {
+ throw new TaskOnFailException("Failed to obtain adaptor for storage resource " + storageId +
" in task " + getTaskId(), false, e);
}
}
@SuppressWarnings("WeakerAccess")
protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException {
+ String computeId = null;
try {
+ computeId = getTaskContext().getComputeResourceId();
return adaptorSupport.fetchAdaptor(
getTaskContext().getGatewayId(),
- getTaskContext().getComputeResourceId(),
+ computeId,
getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getComputeResourceCredentialToken(),
getTaskContext().getComputeResourceLoginUserName());
} catch (Exception e) {
- throw new TaskOnFailException("Failed to obtain adaptor for compute resource " + getTaskContext().getComputeResourceId() +
+ throw new TaskOnFailException("Failed to obtain adaptor for compute resource " + computeId +
" in task " + getTaskId(), false, e);
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index 4a76e1d..a7f8ed9 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -57,16 +57,18 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
defaultJSTaskCounter.inc();
String jobId = null;
AgentAdaptor adaptor;
+ String computeId = null;
try {
+ computeId = getTaskContext().getComputeResourceId();
adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
- getTaskContext().getComputeResourceId(),
+ computeId,
getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getComputeResourceCredentialToken(),
getTaskContext().getComputeResourceLoginUserName());
} catch (Exception e) {
- return onFail("Failed to fetch adaptor to connect to " + getTaskContext().getComputeResourceId(), true, e);
+ return onFail("Failed to fetch adaptor to connect to " + computeId, true, e);
}
try {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
index 14c72e5..dfca629 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
@@ -382,8 +382,7 @@ public class GroovyMapBuilder {
}
}
- private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws
- ApplicationSettingsException, TException, TaskOnFailException {
+ private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws Exception {
ProcessModel processModel = taskContext.getProcessModel();
String emailIds = null;
@@ -420,7 +419,7 @@ public class GroovyMapBuilder {
}
}
- public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws TException, TaskOnFailException {
+ public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws Exception {
JobSubmissionProtocol jobSubmissionProtocol = taskContext.getPreferredJobSubmissionProtocol();
JobSubmissionInterface jobSubmissionInterface = taskContext.getPreferredJobSubmissionInterface();
if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {