You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:09:59 UTC
[airavata] 02/17: Building groovy map
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit ef94a5afd073359b0d75b72f51b254670953b7e2
Author: dimuthu <di...@gmail.com>
AuthorDate: Thu Feb 22 09:37:36 2018 -0500
Building groovy map
---
modules/helix-spectator/pom.xml | 5 +
.../airavata/helix/impl/task/AiravataTask.java | 156 +---
.../airavata/helix/impl/task/EnvSetupTask.java | 6 +-
.../airavata/helix/impl/task/TaskContext.java | 802 +++++++++++++++++++++
.../impl/task/submission/GroovyMapBuilder.java | 335 +++++++++
.../helix/impl/task/submission/GroovyMapData.java | 51 +-
.../submission/task/DefaultJobSubmissionTask.java | 37 +-
.../submission/task/ForkJobSubmissionTask.java | 21 +-
.../task/submission/task/JobSubmissionTask.java | 69 +-
.../submission/task/LocalJobSubmissionTask.java | 10 +-
.../helix/impl/workflow/SimpleWorkflow.java | 2 +-
.../src/main/resources/application.properties | 2 +-
12 files changed, 1275 insertions(+), 221 deletions(-)
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
index bae2785..36fb586 100644
--- a/modules/helix-spectator/pom.xml
+++ b/modules/helix-spectator/pom.xml
@@ -45,6 +45,11 @@
<artifactId>mariadb-java-client</artifactId>
<version>1.1.7</version>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-templates</artifactId>
+ <version>2.4.7</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 72d3e17..315c07c 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -39,10 +39,8 @@ public abstract class AiravataTask extends AbstractTask {
private ProcessModel processModel;
private ComputeResourceDescription computeResourceDescription;
- private ComputeResourcePreference gatewayComputeResourcePreference;
- private UserComputeResourcePreference userComputeResourcePreference;
- private UserResourceProfile userResourceProfile;
- private GatewayResourceProfile gatewayResourceProfile;
+
+ private TaskContext taskContext;
@TaskParam(name = "Process Id")
private String processId;
@@ -87,22 +85,28 @@ public abstract class AiravataTask extends AbstractTask {
this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
.getComputeResourceId());
- this.gatewayComputeResourcePreference = getAppCatalog().getGatewayProfile()
- .getComputeResourcePreference(getGatewayId(), computeResourceDescription.getComputeResourceId());
-
- this.userComputeResourcePreference = getAppCatalog().getUserResourceProfile()
- .getUserComputeResourcePreference(getProcessModel().getUserName(), getGatewayId(), getProcessModel()
- .getComputeResourceId());
-
- this.userResourceProfile = getAppCatalog().getUserResourceProfile()
- .getUserResourceProfile(getProcessModel().getUserName(), getGatewayId());
-
- this.gatewayResourceProfile = getAppCatalog().getGatewayProfile().getGatewayProfile(getGatewayId());
+ TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId());
+ taskContextBuilder.setAppCatalog(getAppCatalog());
+ taskContextBuilder.setExperimentCatalog(getExperimentCatalog());
+ taskContextBuilder.setProcessModel(getProcessModel());
+ taskContextBuilder.setStatusPublisher(getStatusPublisher());
+
+ taskContextBuilder.setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId));
+ taskContextBuilder.setGatewayComputeResourcePreference(
+ appCatalog.getGatewayProfile()
+ .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()));
+ taskContextBuilder.setGatewayStorageResourcePreference(
+ appCatalog.getGatewayProfile()
+ .getStoragePreference(gatewayId, processModel.getStorageResourceId()));
+
+ this.taskContext = taskContextBuilder.build();
} catch (AppCatalogException e) {
e.printStackTrace();
} catch (RegistryException e) {
e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
@@ -125,140 +129,27 @@ public abstract class AiravataTask extends AbstractTask {
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
}
-
- ///////////////////
-
- public String getComputeResourceId() {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getComputeResourceId())) {
- return userComputeResourcePreference.getComputeResourceId();
- } else {
- return gatewayComputeResourcePreference.getComputeResourceId();
- }
- }
-
- public String getComputeResourceCredentialToken(){
- if (isUseUserCRPref()) {
- if (userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
- } else {
- return userResourceProfile.getCredentialStoreToken();
- }
- } else {
- if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
- } else {
- return gatewayResourceProfile.getCredentialStoreToken();
- }
- }
- }
-
- public String getComputeResourceLoginUserName(){
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getLoginUserName())) {
- return userComputeResourcePreference.getLoginUserName();
- } else if (isValid(getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName())) {
- return getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName();
- } else {
- return gatewayComputeResourcePreference.getLoginUserName();
- }
- }
-
- public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException {
- try {
- JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol();
- ComputeResourceDescription resourceDescription = getComputeResourceDescription();
- List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
- Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
- List<JobSubmissionInterface> interfaces = new ArrayList<>();
- if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
- for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
-
- if (preferredJobSubmissionProtocol != null){
- if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
- if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
- List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
- interfaceList.add(submissionInterface);
- }else {
- interfaces.add(submissionInterface);
- orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
- }
- }
- }else {
- Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
- }
- }
- interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
- Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
- } else {
- throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
- }
- return interfaces.get(0);
- } catch (AppCatalogException e) {
- throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
- }
- }
-
//////////////////////////
-
- protected boolean isValid(String str) {
- return str != null && !str.trim().isEmpty();
- }
-
- public boolean isUseUserCRPref() {
- return getProcessModel().isUseUserCRPref();
- }
-
- public JobSubmissionProtocol getJobSubmissionProtocol() {
- return getGatewayComputeResourcePreference().getPreferredJobSubmissionProtocol();
- }
-
- public ComputeResourcePreference getGatewayComputeResourcePreference() {
- return gatewayComputeResourcePreference;
- }
-
-
public ComputeResourceDescription getComputeResourceDescription() {
return computeResourceDescription;
}
////////////////////////
-
- public void setAppCatalog(AppCatalog appCatalog) {
- this.appCatalog = appCatalog;
+
+ public TaskContext getTaskContext() {
+ return taskContext;
}
public ExperimentCatalog getExperimentCatalog() {
return experimentCatalog;
}
- public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
- this.experimentCatalog = experimentCatalog;
- }
-
public Publisher getStatusPublisher() {
return statusPublisher;
}
- public void setStatusPublisher(Publisher statusPublisher) {
- this.statusPublisher = statusPublisher;
- }
-
public String getProcessId() {
return processId;
}
@@ -287,7 +178,4 @@ public abstract class AiravataTask extends AbstractTask {
return processModel;
}
- public void setProcessModel(ProcessModel processModel) {
- this.processModel = processModel;
- }
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index 1cab0e2..f079b9f 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -23,8 +23,10 @@ public class EnvSetupTask extends AiravataTask {
public TaskResult onRun(TaskHelper taskHelper) {
try {
publishTaskState(TaskState.EXECUTING);
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
- getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken());
adaptor.createDirectory(workingDirectory);
publishTaskState(TaskState.COMPLETED);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
new file mode 100644
index 0000000..7de738e
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -0,0 +1,802 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class TaskContext {
+
+ private static final Logger log = LoggerFactory.getLogger(TaskContext.class);
+ // process model
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private Publisher statusPublisher;
+ private final String processId;
+ private final String gatewayId;
+ //private final String tokenId;
+ private ProcessModel processModel;
+ private String workingDir;
+ private String scratchLocation;
+ private String inputDir;
+ private String outputDir;
+ private String localWorkingDir;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private UserResourceProfile userResourceProfile;
+ private UserComputeResourcePreference userComputeResourcePreference;
+ private UserStoragePreference userStoragePreference;
+ private ComputeResourceDescription computeResourceDescription;
+ private ApplicationDeploymentDescription applicationDeploymentDescription;
+ private ApplicationInterfaceDescription applicationInterfaceDescription;
+ private Map<String, String> sshProperties;
+ private String stdoutLocation;
+ private String stderrLocation;
+ private JobSubmissionProtocol jobSubmissionProtocol;
+ private DataMovementProtocol dataMovementProtocol;
+ private JobModel jobModel;
+ private StorageResourceDescription storageResource;
+ private MonitorMode monitorMode;
+ private ResourceJobManager resourceJobManager;
+ private boolean handOver;
+ private boolean cancel;
+ private List<String> taskExecutionOrder;
+ private List<TaskModel> taskList;
+ private Map<String, TaskModel> taskMap;
+ private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
+ private boolean complete = false; // all tasks executed?
+ private boolean recovery = false; // is process in recovery mode?
+ private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+ private boolean acknowledge;
+ private boolean recoveryWithCancel = false;
+ private String usageReportingGatewayId;
+ private List<String> queueSpecificMacros;
+ private String taskId;
+ private Object subTaskModel = null;
+
+
+ /**
+ * Note: process context property use lazy loading approach. In runtime you will see some properties as null
+ * unless you have access it previously. Once that property access using the api,it will be set to correct value.
+ */
+ private TaskContext(String taskId, String processId, String gatewayId) {
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.taskId = taskId;
+ }
+
+ public ExperimentCatalog getExperimentCatalog() {
+ return experimentCatalog;
+ }
+
+ public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ }
+
+ public AppCatalog getAppCatalog() {
+ return appCatalog;
+ }
+
+ public void setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public String getProcessId() {
+ return processId;
+ }
+
+ public Publisher getStatusPublisher() {
+ return statusPublisher;
+ }
+
+ public void setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ }
+
+ public ProcessModel getProcessModel() {
+ return processModel;
+ }
+
+ public void setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ }
+
+ public String getWorkingDir() {
+ if (workingDir == null) {
+ if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
+ workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
+ }else {
+ String scratchLocation = getScratchLocation();
+ workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" +
+ processId);
+ }
+ }
+ return workingDir;
+ }
+
+ public String getScratchLocation() {
+ if (scratchLocation == null) {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getScratchLocation())) {
+ scratchLocation = userComputeResourcePreference.getScratchLocation();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+ scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+ }else {
+ scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+ }
+ }
+ return scratchLocation;
+ }
+
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+
+ public GatewayResourceProfile getGatewayResourceProfile() {
+ return gatewayResourceProfile;
+ }
+
+ public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ }
+
+ public UserResourceProfile getUserResourceProfile() {
+ return userResourceProfile;
+ }
+
+ public void setUserResourceProfile(UserResourceProfile userResourceProfile) {
+ this.userResourceProfile = userResourceProfile;
+ }
+
+ private UserComputeResourcePreference getUserComputeResourcePreference() {
+ return userComputeResourcePreference;
+ }
+
+ public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
+ this.userComputeResourcePreference = userComputeResourcePreference;
+ }
+
+ public UserStoragePreference getUserStoragePreference() {
+ return userStoragePreference;
+ }
+
+ public void setUserStoragePreference(UserStoragePreference userStoragePreference) {
+ this.userStoragePreference = userStoragePreference;
+ }
+
+ public StoragePreference getGatewayStorageResourcePreference() {
+ return gatewayStorageResourcePreference;
+ }
+
+ public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ }
+
+
+ public Map<String, String> getSshProperties() {
+ return sshProperties;
+ }
+
+ public void setSshProperties(Map<String, String> sshProperties) {
+ this.sshProperties = sshProperties;
+ }
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
+ }
+
+ public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+ return applicationDeploymentDescription;
+ }
+
+ public void setApplicationDeploymentDescription(ApplicationDeploymentDescription
+ applicationDeploymentDescription) {
+ this.applicationDeploymentDescription = applicationDeploymentDescription;
+ }
+
+ public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+ return applicationInterfaceDescription;
+ }
+
+ public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+ this.applicationInterfaceDescription = applicationInterfaceDescription;
+ }
+
+ public String getStdoutLocation() {
+ return stdoutLocation;
+ }
+
+ public void setStdoutLocation(String stdoutLocation) {
+ this.stdoutLocation = stdoutLocation;
+ }
+
+ public String getStderrLocation() {
+ return stderrLocation;
+ }
+
+ public void setStderrLocation(String stderrLocation) {
+ this.stderrLocation = stderrLocation;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public String getOutputDir() {
+ if (outputDir == null) {
+ outputDir = getWorkingDir();
+ }
+ return outputDir;
+ }
+
+ public String getInputDir() {
+ if (inputDir == null) {
+ inputDir = getWorkingDir();
+ }
+ return inputDir;
+ }
+
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+
+ public JobSubmissionProtocol getJobSubmissionProtocol() {
+ if (jobSubmissionProtocol == null) {
+ jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+ return jobSubmissionProtocol;
+ }
+
+ public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
+ this.jobSubmissionProtocol = jobSubmissionProtocol;
+ }
+
+ public DataMovementProtocol getDataMovementProtocol() {
+ if (dataMovementProtocol == null) {
+ dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+ return dataMovementProtocol;
+ }
+
+ public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
+ this.dataMovementProtocol = dataMovementProtocol;
+ }
+
+ public String getTaskDag() {
+ return getProcessModel().getTaskDag();
+ }
+
+ public List<TaskModel> getTaskList() {
+ if (taskList == null) {
+ synchronized (TaskModel.class){
+ if (taskList == null) {
+ taskList = getProcessModel().getTasks();
+ }
+ }
+ }
+ return taskList;
+ }
+
+
+ public List<String> getTaskExecutionOrder() {
+ return taskExecutionOrder;
+ }
+
+ public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+ this.taskExecutionOrder = taskExecutionOrder;
+ }
+
+ public Map<String, TaskModel> getTaskMap() {
+ if (taskMap == null) {
+ synchronized (TaskModel.class) {
+ if (taskMap == null) {
+ taskMap = new HashMap<>();
+ for (TaskModel taskModel : getTaskList()) {
+ taskMap.put(taskModel.getTaskId(), taskModel);
+ }
+ }
+ }
+ }
+ return taskMap;
+ }
+
+ public JobModel getJobModel() {
+ if (jobModel == null) {
+ jobModel = new JobModel();
+ jobModel.setProcessId(processId);
+ jobModel.setWorkingDir(getWorkingDir());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ }
+ return jobModel;
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ this.jobModel = jobModel;
+ }
+
+ public ComputeResourcePreference getGatewayComputeResourcePreference() {
+ return gatewayComputeResourcePreference;
+ }
+
+ public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ }
+
+ public ProcessState getProcessState() {
+ if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0)
+ return processModel.getProcessStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public void setProcessStatus(ProcessStatus status) {
+ if (status != null) {
+ log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
+ getProcessState().name(), status.getState().name());
+ List<ProcessStatus> processStatuses = new ArrayList<>();
+ processStatuses.add(status);
+ processModel.setProcessStatuses(processStatuses);
+ }
+ }
+
+ public ProcessStatus getProcessStatus(){
+ if(processModel.getProcessStatuses() != null)
+ return processModel.getProcessStatuses().get(0);
+ else
+ return null;
+ }
+
+ public String getComputeResourceId() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getComputeResourceId())) {
+ return userComputeResourcePreference.getComputeResourceId();
+ } else {
+ return gatewayComputeResourcePreference.getComputeResourceId();
+ }
+ }
+
+ public String getComputeResourceCredentialToken(){
+ if (isUseUserCRPref()) {
+ if (userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return userResourceProfile.getCredentialStoreToken();
+ }
+ } else {
+ if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+ }
+
+ public String getStorageResourceCredentialToken(){
+ if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+
+ public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
+ return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+
+ public DataMovementProtocol getPreferredDataMovementProtocol() {
+ return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+
+ public void setMonitorMode(MonitorMode monitorMode) {
+ this.monitorMode = monitorMode;
+ }
+
+ public MonitorMode getMonitorMode() {
+ return monitorMode;
+ }
+
+ public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+ this.resourceJobManager = resourceJobManager;
+ }
+
+ public ResourceJobManager getResourceJobManager() {
+ return resourceJobManager;
+ }
+
+ public String getLocalWorkingDir() {
+ return localWorkingDir;
+ }
+
+ public void setLocalWorkingDir(String localWorkingDir) {
+ this.localWorkingDir = localWorkingDir;
+ }
+
+ public String getExperimentId() {
+ return processModel.getExperimentId();
+ }
+
+ public boolean isHandOver() {
+ return handOver;
+ }
+
+ public void setHandOver(boolean handOver) {
+ this.handOver = handOver;
+ }
+
+ public boolean isCancel() {
+ return cancel;
+ }
+
+ public void setCancel(boolean cancel) {
+ this.cancel = cancel;
+ }
+
+ public boolean isInterrupted(){
+ return this.cancel || this.handOver;
+ }
+
+ public String getCurrentExecutingTaskId() {
+ if (currentExecutingTaskModel != null) {
+ return currentExecutingTaskModel.getTaskId();
+ }
+ return null;
+ }
+
+ public boolean isPauseTaskExecution() {
+ return pauseTaskExecution;
+ }
+
+ public void setPauseTaskExecution(boolean pauseTaskExecution) {
+ this.pauseTaskExecution = pauseTaskExecution;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public void setComplete(boolean complete) {
+ this.complete = complete;
+ }
+
+ public boolean isRecovery() {
+ return recovery;
+ }
+
+ public void setRecovery(boolean recovery) {
+ this.recovery = recovery;
+ }
+
+ public TaskModel getCurrentExecutingTaskModel() {
+ return currentExecutingTaskModel;
+ }
+
+ public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+ this.currentExecutingTaskModel = currentExecutingTaskModel;
+ }
+
+ public StorageResourceDescription getStorageResource() {
+ return storageResource;
+ }
+
+ public void setStorageResource(StorageResourceDescription storageResource) {
+ this.storageResource = storageResource;
+ }
+
+ public void setAcknowledge(boolean acknowledge) {
+ this.acknowledge = acknowledge;
+ }
+
+ public boolean isAcknowledge() {
+ return acknowledge;
+ }
+
+ public boolean isRecoveryWithCancel() {
+ return recoveryWithCancel;
+ }
+
+ public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+ this.recoveryWithCancel = recoveryWithCancel;
+ }
+
+ public boolean isUseUserCRPref() {
+ return getProcessModel().isUseUserCRPref();
+ }
+
+ public String getComputeResourceLoginUserName(){
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getLoginUserName())) {
+ return userComputeResourcePreference.getLoginUserName();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
+ return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
+ } else {
+ return gatewayComputeResourcePreference.getLoginUserName();
+ }
+ }
+
+ public String getStorageResourceLoginUserName(){
+ return gatewayStorageResourcePreference.getLoginUserName();
+ }
+
+ public String getStorageFileSystemRootLocation(){
+ return gatewayStorageResourcePreference.getFileSystemRootLocation();
+ }
+
+ public String getStorageResourceId() {
+ return gatewayStorageResourcePreference.getStorageResourceId();
+ }
+
+ private ComputationalResourceSchedulingModel getProcessCRSchedule() {
+ if (getProcessModel() != null) {
+ return getProcessModel().getProcessResourceSchedule();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean isValid(String str) {
+ return str != null && !str.trim().isEmpty();
+ }
+
+ public String getUsageReportingGatewayId() {
+ return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+ }
+
+ public String getAllocationProjectNumber() {
+ return gatewayComputeResourcePreference.getAllocationProjectNumber();
+ }
+
+ public String getReservation() {
+ long start = 0, end = 0;
+ String reservation = null;
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getReservation())) {
+ reservation = userComputeResourcePreference.getReservation();
+ start = userComputeResourcePreference.getReservationStartTime();
+ end = userComputeResourcePreference.getReservationEndTime();
+ } else {
+ reservation = gatewayComputeResourcePreference.getReservation();
+ start = gatewayComputeResourcePreference.getReservationStartTime();
+ end = gatewayComputeResourcePreference.getReservationEndTime();
+ }
+ if (reservation != null && start > 0 && start < end) {
+ long now = Calendar.getInstance().getTimeInMillis();
+ if (now > start && now < end) {
+ return reservation;
+ }
+ }
+ return null;
+ }
+
+ public String getQualityOfService() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getQualityOfService())) {
+ return userComputeResourcePreference.getQualityOfService();
+ } else {
+ return gatewayComputeResourcePreference.getQualityOfService();
+ }
+ }
+
+
+ public String getQueueName() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
+ return userComputeResourcePreference.getPreferredBatchQueue();
+ } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
+ return processModel.getProcessResourceSchedule().getQueueName();
+ } else {
+ return gatewayComputeResourcePreference.getPreferredBatchQueue();
+ }
+ }
+
+ public List<String> getQueueSpecificMacros() {
+ String queueName = getProcessCRSchedule().getQueueName();
+ Optional<BatchQueue> queue = getComputeResourceDescription().getBatchQueues().stream()
+ .filter(x->x.getQueueName().equals(queueName)).findFirst();
+ if(queue.isPresent()){
+ if(queue.get().getQueueSpecificMacros() != null && !queue.get().getQueueSpecificMacros().equals("")){
+ return Arrays.asList(queue.get().getQueueSpecificMacros().split(","));
+ }
+ }
+ return null;
+ }
+
+ public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException {
+ try {
+ JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol();
+ ComputeResourceDescription resourceDescription = getComputeResourceDescription();
+ List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+ Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+ List<JobSubmissionInterface> interfaces = new ArrayList<>();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+ for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+ if (preferredJobSubmissionProtocol != null){
+ if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+ if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+ List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+ interfaceList.add(submissionInterface);
+ }else {
+ interfaces.add(submissionInterface);
+ orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+ }
+ }
+ }else {
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ }
+ }
+ interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+ Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ } else {
+ throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+ }
+ return interfaces.get(0);
+ } catch (AppCatalogException e) {
+ throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+ }
+ }
+
+ public TaskModel getCurrentTaskModel() {
+ return getTaskMap().get(taskId);
+ }
+
+ public Object getSubTaskModel() throws TException {
+ if (subTaskModel == null) {
+ subTaskModel = ThriftUtils.getSubTaskModel(getCurrentTaskModel());
+ }
+ return subTaskModel;
+ }
+
+ public static class TaskContextBuilder {
+ private final String processId;
+ private final String gatewayId;
+ private final String taskId;
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private Publisher statusPublisher;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private ProcessModel processModel;
+
+ public TaskContextBuilder(String processId, String gatewayId, String taskId) throws Exception {
+ if (notValid(processId) || notValid(gatewayId) || notValid(taskId)) {
+ throwError("Process Id, Gateway Id and Task Id must be not null");
+ }
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.taskId = taskId;
+ }
+
+ public TaskContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ return this;
+ }
+
+ public TaskContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ return this;
+ }
+
+ public TaskContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ return this;
+ }
+
+ public TaskContextBuilder setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ return this;
+ }
+
+ public TaskContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ return this;
+ }
+
+ public TaskContextBuilder setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ return this;
+ }
+
+ public TaskContextBuilder setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ return this;
+ }
+
+ public TaskContext build() throws Exception {
+ if (notValid(gatewayResourceProfile)) {
+ throwError("Invalid GatewayResourceProfile");
+ }
+ if (notValid(gatewayComputeResourcePreference)) {
+ throwError("Invalid Gateway ComputeResourcePreference");
+ }
+ if (notValid(gatewayStorageResourcePreference)) {
+ throwError("Invalid Gateway StoragePreference");
+ }
+ if (notValid(processModel)) {
+ throwError("Invalid Process Model");
+ }
+ if (notValid(appCatalog)) {
+ throwError("Invalid AppCatalog");
+ }
+ if (notValid(experimentCatalog)) {
+ throwError("Invalid Experiment catalog");
+ }
+ //if (notValid(statusPublisher)) {
+ // throwError("Invalid Status Publisher");
+ //}
+
+ TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
+ ctx.setAppCatalog(appCatalog);
+ ctx.setExperimentCatalog(experimentCatalog);
+ ctx.setStatusPublisher(statusPublisher);
+ ctx.setProcessModel(processModel);
+ ctx.setGatewayResourceProfile(gatewayResourceProfile);
+ ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
+ ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
+
+ return ctx;
+ }
+
+ private boolean notValid(Object value) {
+ return value == null;
+ }
+
+ private void throwError(String msg) throws Exception {
+ throw new Exception(msg);
+ }
+
+ }
+}
+
+
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
new file mode 100644
index 0000000..0b92922
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -0,0 +1,335 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import groovy.text.GStringTemplateEngine;
+import groovy.text.TemplateEngine;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GroovyMapBuilder {
+
+ private static final Logger logger = LogManager.getLogger(GroovyMapBuilder.class);
+
+ public static final String MULTIPLE_INPUTS_SPLITTER = ",";
+
+ private TaskContext taskContext;
+
+ public GroovyMapBuilder(TaskContext taskContext) {
+ this.taskContext = taskContext;
+ }
+
+ public GroovyMapData build() throws Exception {
+ GroovyMapData mapData = new GroovyMapData();
+ mapData.setInputDir(taskContext.getInputDir());
+ mapData.setOutputDir(taskContext.getOutputDir());
+ mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath());
+ mapData.setStdoutFile(taskContext.getStdoutLocation());
+ mapData.setStderrFile(taskContext.getStderrLocation());
+ mapData.setScratchLocation(taskContext.getScratchLocation());
+ mapData.setGatewayId(taskContext.getGatewayId());
+ mapData.setGatewayUserName(taskContext.getProcessModel().getUserName());
+ mapData.setApplicationName(taskContext.getApplicationInterfaceDescription().getApplicationName());
+ mapData.setQueueSpecificMacros(taskContext.getQueueSpecificMacros());
+ mapData.setAccountString(taskContext.getAllocationProjectNumber());
+ mapData.setReservation(taskContext.getReservation());
+ mapData.setJobName("A" + String.valueOf(generateJobName()));
+
+ List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true);
+ inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true));
+ mapData.setInputs(inputValues);
+
+ List<String> inputValuesAll = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false);
+ inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false));
+ mapData.setInputsAll(inputValuesAll);
+
+ //mapData.setUserName(taskContext.geJo)
+
+ mapData.setShellName("/bin/bash");
+
+ if (taskContext != null) {
+ try {
+ JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
+ if (jobSubmissionTaskModel.getWallTime() > 0) {
+ mapData.setMaxWallTime(maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime()));
+ // TODO fix this
+ /*if (resourceJobManager != null) {
+ if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
+ groovyMap.add(Script.MAX_WALL_TIME,
+ GFacUtils.maxWallTimeCalculatorForLSF(jobSubmissionTaskModel.getWallTime()));
+ }
+ }*/
+ }
+ } catch (TException e) {
+ logger.error("Error while getting job submission sub task model", e);
+ }
+ }
+
+ // NOTE: Give precedence to data comes with experiment
+ // qos per queue
+ String qoS = getQoS(taskContext.getQualityOfService(), taskContext.getQueueName());
+ if (qoS != null) {
+ mapData.setQualityOfService(qoS);
+ }
+ ComputationalResourceSchedulingModel scheduling = taskContext.getProcessModel().getProcessResourceSchedule();
+ if (scheduling != null) {
+ int totalNodeCount = scheduling.getNodeCount();
+ int totalCPUCount = scheduling.getTotalCPUCount();
+
+ if (isValid(scheduling.getQueueName())) {
+ mapData.setQueueName(scheduling.getQueueName());
+ }
+ if (totalNodeCount > 0) {
+ mapData.setNodes(totalCPUCount);
+ }
+ if (totalCPUCount > 0) {
+ int ppn = totalCPUCount / totalNodeCount;
+ mapData.setProcessPerNode(ppn);
+ mapData.setCpuCount(totalCPUCount);
+ }
+ // max wall time may be set before this level if jobsubmission task has wall time configured to this job,
+ // if so we ignore scheduling configuration.
+ if (scheduling.getWallTimeLimit() > 0 && mapData.getMaxWallTime() == null) {
+ mapData.setMaxWallTime(maxWallTimeCalculator(scheduling.getWallTimeLimit()));
+
+ // TODO fix this
+ /*
+ if (resourceJobManager != null) {
+ if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
+ mapData.setMaxWallTime(maxWallTimeCalculatorForLSF(scheduling.getWallTimeLimit()));
+ }
+ }
+ */
+ }
+ if (scheduling.getTotalPhysicalMemory() > 0) {
+ mapData.setUsedMem(scheduling.getTotalPhysicalMemory());
+ }
+ if (isValid(scheduling.getOverrideLoginUserName())) {
+ mapData.setUserName(scheduling.getOverrideLoginUserName());
+ }
+ if (isValid(scheduling.getOverrideAllocationProjectNumber())) {
+ mapData.setAccountString(scheduling.getOverrideAllocationProjectNumber());
+ }
+ if (isValid(scheduling.getStaticWorkingDir())) {
+ mapData.setWorkingDirectory(scheduling.getStaticWorkingDir());
+ }
+ } else {
+ logger.error("Task scheduling cannot be null at this point..");
+ }
+
+ ApplicationDeploymentDescription appDepDescription = taskContext.getApplicationDeploymentDescription();
+
+ List<SetEnvPaths> exportCommands = appDepDescription.getSetEnvironment();
+ if (exportCommands != null) {
+ List<String> exportCommandList = exportCommands.stream()
+ .sorted((e1, e2) -> e1.getEnvPathOrder() - e2.getEnvPathOrder())
+ .map(map -> map.getName() + "=" + map.getValue())
+ .collect(Collectors.toList());
+ mapData.setExports(exportCommandList);
+ }
+
+ List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds();
+ if (moduleCmds != null) {
+ List<String> modulesCmdCollect = moduleCmds.stream()
+ .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+ .map(map -> map.getCommand())
+ .collect(Collectors.toList());
+ mapData.setModuleCommands(modulesCmdCollect);
+ }
+
+ List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands();
+ if (preJobCommands != null) {
+ List<String> preJobCmdCollect = preJobCommands.stream()
+ .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+ .map(map -> parseCommands(map.getCommand(), mapData))
+ .collect(Collectors.toList());
+ mapData.setPreJobCommands(preJobCmdCollect);
+ }
+
+ List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands();
+ if (postJobCommands != null) {
+ List<String> postJobCmdCollect = postJobCommands.stream()
+ .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+ .map(map -> parseCommands(map.getCommand(), mapData))
+ .collect(Collectors.toList());
+ mapData.setPostJobCommands(postJobCmdCollect);
+ }
+
+ ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+ if (parallelism != null) {
+ if (parallelism != ApplicationParallelismType.SERIAL) {
+ Map<ApplicationParallelismType, String> parallelismPrefix = taskContext.getResourceJobManager().getParallelismPrefix();
+ if (parallelismPrefix != null){
+ String parallelismCommand = parallelismPrefix.get(parallelism);
+ if (parallelismCommand != null){
+ mapData.setJobSubmitterCommand(parallelismCommand);
+ }else {
+ throw new Exception("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog");
+ }
+ }
+ }
+ }
+
+ return mapData;
+ }
+
+ public static int generateJobName() {
+ Random random = new Random();
+ int i = random.nextInt(Integer.MAX_VALUE);
+ i = i + 99999999;
+ if (i < 0) {
+ i = i * (-1);
+ }
+ return i;
+ }
+
+ private static List<String> getProcessInputValues(List<InputDataObjectType> processInputs, boolean commandLineOnly) {
+ List<String> inputValues = new ArrayList<String>();
+ if (processInputs != null) {
+
+ // sort the inputs first and then build the command ListR
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (InputDataObjectType input : processInputs) {
+ sortedInputSet.add(input);
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (commandLineOnly && !inputDataObjectType.isRequiredToAddedToCommandLine()) {
+ continue;
+ }
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ inputValues.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ if (inputDataObjectType.getType() == DataType.URI) {
+ // set only the relative path
+ String filePath = inputDataObjectType.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ } else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) {
+ String filePaths = inputDataObjectType.getValue();
+ String[] paths = filePaths.split(MULTIPLE_INPUTS_SPLITTER);
+ String filePath;
+ String inputs = "";
+ int i = 0;
+ for (; i < paths.length - 1; i++) {
+ filePath = paths[i];
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ // File names separate by a space
+ inputs += filePath + " ";
+ }
+ inputs += paths[i];
+ inputValues.add(inputs);
+ } else {
+ inputValues.add(inputDataObjectType.getValue());
+ }
+
+ }
+ }
+ }
+ return inputValues;
+ }
+
+ private static List<String> getProcessOutputValues(List<OutputDataObjectType> processOutputs, boolean commandLineOnly) {
+ List<String> inputValues = new ArrayList<>();
+ if (processOutputs != null) {
+ for (OutputDataObjectType output : processOutputs) {
+ if (output.getApplicationArgument() != null
+ && !output.getApplicationArgument().equals("")) {
+ inputValues.add(output.getApplicationArgument());
+ }
+ if(commandLineOnly){
+ if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+ if (output.getType() == DataType.URI) {
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }else{
+ if (output.getValue() != null && !output.getValue().equals("")) {
+ if (output.getType() == DataType.URI) {
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }
+
+ }
+ }
+ return inputValues;
+ }
+
+ static String getQoS(String qualityOfService, String preferredBatchQueue) {
+ if(preferredBatchQueue == null || preferredBatchQueue.isEmpty()
+ || qualityOfService == null || qualityOfService.isEmpty()) return null;
+ final String qos = "qos";
+ Pattern pattern = Pattern.compile(preferredBatchQueue + "=(?<" + qos + ">[^,]*)");
+ Matcher matcher = pattern.matcher(qualityOfService);
+ if (matcher.find()) {
+ return matcher.group(qos);
+ }
+ return null;
+ }
+
+ public static String maxWallTimeCalculator(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime + ":00";
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes + ":00";
+ }
+ }
+
+ public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime;
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes;
+ }
+ }
+
+ private static boolean isValid(String str) {
+ return str != null && !str.isEmpty();
+ }
+
+ static String parseCommands(String value, GroovyMapData bindMap) {
+ TemplateEngine templateEngine = new GStringTemplateEngine();
+ try {
+ return templateEngine.createTemplate(value).make(bindMap.toImmutableMap()).toString();
+ } catch (ClassNotFoundException | IOException e) {
+ throw new IllegalArgumentException("Error while parsing command " + value
+ + " , Invalid command or incomplete bind map");
+ }
+ }
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
index ec75fb7..995f772 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
@@ -1,6 +1,14 @@
package org.apache.airavata.helix.impl.task.submission;
+import com.google.common.collect.ImmutableMap;
+import groovy.lang.Writable;
+import groovy.text.GStringTemplateEngine;
+import groovy.text.TemplateEngine;
+import org.apache.airavata.common.utils.ApplicationSettings;
+
+import java.io.File;
import java.lang.reflect.Field;
+import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,7 +43,7 @@ public class GroovyMapData {
private String applicationName;
@ScriptTag(name = "queueSpecificMacros")
- private String queueSpecificMacros;
+ private List<String> queueSpecificMacros;
@ScriptTag(name = "accountString")
private String accountString;
@@ -206,13 +214,12 @@ public class GroovyMapData {
return this;
}
- public String getQueueSpecificMacros() {
+ public List<String> getQueueSpecificMacros() {
return queueSpecificMacros;
}
- public GroovyMapData setQueueSpecificMacros(String queueSpecificMacros) {
+ public void setQueueSpecificMacros(List<String> queueSpecificMacros) {
this.queueSpecificMacros = queueSpecificMacros;
- return this;
}
public String getAccountString() {
@@ -412,4 +419,40 @@ public class GroovyMapData {
this.chassisName = chassisName;
return this;
}
+
+ public Map toImmutableMap() {
+
+ Map<String, Object> dataMap = new HashMap<>();
+ Field[] declaredFields = this.getClass().getDeclaredFields();
+ for (Field field : declaredFields) {
+ field.setAccessible(true);
+ if (field.getAnnotation(ScriptTag.class) != null) {
+ try {
+ dataMap.put(field.getAnnotation(ScriptTag.class).name(), field.get(this));
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return dataMap;
+ }
+
+ public String getAsString(String templateName) throws Exception {
+ URL templateUrl = ApplicationSettings.loadFile(templateName);
+ if (templateUrl == null) {
+ String error = "Template file '" + templateName + "' not found";
+ throw new Exception(error);
+ }
+ File template = new File(templateUrl.getPath());
+ TemplateEngine engine = new GStringTemplateEngine();
+ Writable make;
+ try {
+
+ make = engine.createTemplate(template).make(toImmutableMap());
+ } catch (Exception e) {
+ throw new Exception("Error while generating script using groovy map");
+ }
+ return make.toString();
+ }
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index fb9917f..fab4747 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
@@ -36,25 +37,24 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
@Override
public TaskResult onRun(TaskHelper taskHelper) {
try {
- GroovyMapData groovyMapData = new GroovyMapData();
+ GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
JobModel jobModel = new JobModel();
jobModel.setProcessId(getProcessId());
- jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+ jobModel.setWorkingDir(mapData.getWorkingDirectory());
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setTaskId(getTaskId());
- jobModel.setJobName(groovyMapData.getJobName());
+ jobModel.setJobName(mapData.getJobName());
- File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+ if (mapData != null) {
+ //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken());
-
- if (jobFile != null && jobFile.exists()) {
- jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
- getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
-
- JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
jobModel.setExitCode(submissionOutput.getExitCode());
jobModel.setStdErr(submissionOutput.getStdErr());
@@ -137,7 +137,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId());
- String username = experiment.getUserName() + "@" + getGatewayComputeResourcePreference().getUsageReportingGatewayId();
+ String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId();
RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username +
" -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId );
adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
@@ -150,7 +150,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
} else {
int verificationTryCount = 0;
while (verificationTryCount++ < 3) {
- String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getComputeResourceLoginUserName());
+ String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName());
if (verifyJobId != null && !verifyJobId.isEmpty()) {
// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
jobId = verifyJobId;
@@ -194,17 +194,12 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
}
} else {
+ return onFail("Job data is null", true, null);
+ // taskStatus.setReason("JobFile is null");
//taskStatus.setState(TaskState.FAILED);
- if (jobFile == null) {
- return onFail("Job file is null", true, null);
- // taskStatus.setReason("JobFile is null");
- } else {
- //taskStatus.setReason("Job file doesn't exist");
- return onFail("Job file doesn't exist", true, null);
- }
}
} catch (Exception e) {
- return onFail("Task failed due to unexpected issue", false, null);
+ return onFail("Task failed due to unexpected issue", false, e);
}
// TODO get rid of this
return onFail("Task moved to an unknown state", false, null);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
index da04365..58b70ef 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
import org.apache.airavata.helix.task.api.TaskHelper;
@@ -23,23 +24,23 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
public TaskResult onRun(TaskHelper taskHelper) {
try {
- GroovyMapData groovyMapData = new GroovyMapData();
+ GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
JobModel jobModel = new JobModel();
jobModel.setProcessId(getProcessId());
- jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+ jobModel.setWorkingDir(mapData.getWorkingDirectory());
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setTaskId(getTaskId());
- jobModel.setJobName(groovyMapData.getJobName());
+ jobModel.setJobName(mapData.getJobName());
- File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+ if (mapData != null) {
+ //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken());
- if (jobFile != null && jobFile.exists()) {
- jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
- getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
-
- JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
jobModel.setExitCode(submissionOutput.getExitCode());
jobModel.setStdErr(submissionOutput.getStdErr());
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index fe5a3dc..11e59eb 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -4,11 +4,14 @@ import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
@@ -23,9 +26,11 @@ import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.cpi.*;
+import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import java.io.File;
+import java.security.SecureRandom;
import java.util.*;
public abstract class JobSubmissionTask extends AiravataTask {
@@ -38,10 +43,19 @@ public abstract class JobSubmissionTask extends AiravataTask {
}
//////////////////////
- protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, File jobFile, String workingDirectory) throws Exception {
+ protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
- getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
- RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobFile.getPath());
+ getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
+
+ String scriptAsString = groovyMapData.getAsString(jobManagerConfiguration.getJobDescriptionTemplateName());
+
+ int number = new SecureRandom().nextInt();
+ number = (number < 0 ? -number : number);
+ File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
+ FileUtils.writeStringToFile(tempJobFile, scriptAsString);
+
+ // TODO transfer file
+ RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
@@ -63,12 +77,17 @@ public abstract class JobSubmissionTask extends AiravataTask {
jsoutput.setStdOut(commandOutput.getStdOut());
jsoutput.setStdErr(commandOutput.getStdError());
return jsoutput;
+ }
+ public File getLocalDataDir() {
+ String outputPath = ServerSettings.getLocalDataLocation();
+ outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
+ return new File(outputPath + getProcessId());
}
public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
- getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+ getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobID).getRawCommand(), null);
return jobManagerConfiguration.getParser().parseJobStatus(jobID, commandOutput.getStdOut());
@@ -77,7 +96,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
- getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+ getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null);
@@ -159,44 +178,4 @@ public abstract class JobSubmissionTask extends AiravataTask {
///////////// required for groovy map
- private String workingDir;
- private String scratchLocation;
- private UserComputeResourcePreference userComputeResourcePreference;
-
- public String getWorkingDir() {
- if (workingDir == null) {
- if (getProcessModel().getProcessResourceSchedule().getStaticWorkingDir() != null){
- workingDir = getProcessModel().getProcessResourceSchedule().getStaticWorkingDir();
- }else {
- String scratchLocation = getScratchLocation();
- workingDir = (scratchLocation.endsWith("/") ? scratchLocation + getProcessId() : scratchLocation + "/" +
- getProcessId());
- }
- }
- return workingDir;
- }
-
- public String getScratchLocation() {
- if (scratchLocation == null) {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getScratchLocation())) {
- scratchLocation = userComputeResourcePreference.getScratchLocation();
- } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
- scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
- }else {
- scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
- }
- }
- return scratchLocation;
- }
-
- protected UserComputeResourcePreference userComputeResourcePreference() throws AppCatalogException {
- UserComputeResourcePreference userComputeResourcePreference =
- getAppCatalog().getUserResourceProfile().getUserComputeResourcePreference(
- getProcessModel().getUserName(),
- getGatewayId(),
- getProcessModel().getComputeResourceId());
- }
-
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
index 5a3ca31..67ad0db 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
@@ -41,10 +42,13 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
saveJobModel(jobModel);
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
- getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken());
- JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+ GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, groovyMapData.getWorkingDirectory());
JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(JobState.SUBMITTED);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 51feff4..397ff45 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -20,7 +20,7 @@ public class SimpleWorkflow {
defaultJobSubmissionTask.setGatewayId("default");
defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
- defaultJobSubmissionTask.setTaskId(UUID.randomUUID().toString());
+ defaultJobSubmissionTask.setTaskId("TASK_612844a4-aedb-41a5-824f-9b20c76867f7");
List<AbstractTask> tasks = new ArrayList<>();
tasks.add(defaultJobSubmissionTask);
diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties
index 41c5e5f..a9b0969 100644
--- a/modules/helix-spectator/src/main/resources/application.properties
+++ b/modules/helix-spectator/src/main/resources/application.properties
@@ -1,3 +1,3 @@
zookeeper.connection.url=localhost:2199
helix.cluster.name=AiravataDemoCluster
-participant.name=all-p1
\ No newline at end of file
+participant.name=all-p2
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.