You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/10 16:38:10 UTC
airavata git commit: Check reseravation start and end time,
refactor createJobDescriptor god method
Repository: airavata
Updated Branches:
refs/heads/hotfix-for-allocation 876a1405c -> e759b5505
Check reseravation start and end time, refactor createJobDescriptor god method
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e759b550
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e759b550
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e759b550
Branch: refs/heads/hotfix-for-allocation
Commit: e759b55050da5c6a1f2d601cbd6b8272127894e8
Parents: 876a140
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 12:32:42 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 12:38:00 2016 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacUtils.java | 261 ++++++++++---------
1 file changed, 143 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e759b550/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index afbf425..f7d53dc 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -408,45 +408,13 @@ public class GFacUtils {
return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
}
- public static JobDescriptor createJobDescriptor(ProcessContext processContext, TaskContext taskContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
+ public static JobDescriptor createJobDescriptor(ProcessContext processContext, TaskContext taskContext)
+ throws GFacException, AppCatalogException, ApplicationSettingsException {
+
JobDescriptor jobDescriptor = new JobDescriptor();
- String emailIds = null;
ProcessModel processModel = processContext.getProcessModel();
ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
- if (isEmailBasedJobMonitor(processContext)) {
- emailIds = ServerSettings.getEmailBasedMonitorAddress();
- }
- if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
- String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
- if (flags != null && processContext.getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
- flags = "ALL";
- }
- jobDescriptor.setMailOptions(flags);
-
- String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
- if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
- if (emailIds != null && !emailIds.isEmpty()) {
- emailIds += ("," + userJobNotifEmailIds);
- } else {
- emailIds = userJobNotifEmailIds;
- }
- }
- if (processModel.isEnableEmailNotification()) {
- List<String> emailList = processModel.getEmailAddresses();
- String elist = GFacUtils.listToCsv(emailList, ',');
- if (elist != null && !elist.isEmpty()) {
- if (emailIds != null && !emailIds.isEmpty()) {
- emailIds = emailIds + "," + elist;
- } else {
- emailIds = elist;
- }
- }
- }
- }
- if (emailIds != null && !emailIds.isEmpty()) {
- log.info("Email list: " + emailIds);
- jobDescriptor.setMailAddress(emailIds);
- }
+ setMailAddresses(processContext, jobDescriptor); // set email options and addresses
jobDescriptor.setInputDirectory(processContext.getInputDir());
jobDescriptor.setOutputDirectory(processContext.getOutputDir());
@@ -457,89 +425,16 @@ public class GFacUtils {
if (crp.getAllocationProjectNumber() != null) {
jobDescriptor.setAcountString(crp.getAllocationProjectNumber());
}
+ jobDescriptor.setReservation(getReservation(crp));
- if (crp.getQualityOfService() != null) {
- // qos per queue
- jobDescriptor.setQoS(
- getQoS(crp.getQualityOfService(), crp.getPreferredBatchQueue()));
- }
-
- jobDescriptor.setReservation(crp.getReservation());
// To make job name alpha numeric
jobDescriptor.setJobName("A" + String.valueOf(generateJobName()));
jobDescriptor.setWorkingDirectory(processContext.getWorkingDir());
- List<String> inputValues = new ArrayList<String>();
- List<InputDataObjectType> processInputs = processModel.getProcessInputs();
- if (processInputs != null) {
-
- // sort the inputs first and then build the command ListR
- Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
- @Override
- public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
- return inputDataObjectType.getInputOrder() - t1.getInputOrder();
- }
- };
- Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
- for (InputDataObjectType input : processInputs) {
- sortedInputSet.add(input);
- }
- for (InputDataObjectType inputDataObjectType : sortedInputSet) {
- if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
- continue;
- }
- if (inputDataObjectType.getApplicationArgument() != null
- && !inputDataObjectType.getApplicationArgument().equals("")) {
- inputValues.add(inputDataObjectType.getApplicationArgument());
- }
-
- if (inputDataObjectType.getValue() != null
- && !inputDataObjectType.getValue().equals("")) {
- if (inputDataObjectType.getType() == DataType.URI) {
- // set only the relative path
- String filePath = inputDataObjectType.getValue();
- filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
- inputValues.add(filePath);
- } else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) {
- String filePaths = inputDataObjectType.getValue();
- String[] paths = filePaths.split(GFacConstants.MULTIPLE_INPUTS_SPLITTER);
- String filePath;
- String inputs = "";
- int i = 0;
- for (; i < paths.length - 1; i++) {
- filePath = paths[i];
- filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
- // File names separate by a space
- inputs += filePath + " ";
- }
- inputs += paths[i];
- inputValues.add(inputs);
- } else {
- inputValues.add(inputDataObjectType.getValue());
- }
-
- }
- }
- }
-
- List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
- if (processOutputs != null) {
- for (OutputDataObjectType output : processOutputs) {
- if (output.getApplicationArgument() != null
- && !output.getApplicationArgument().equals("")) {
- inputValues.add(output.getApplicationArgument());
- }
- if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
- if (output.getType() == DataType.URI) {
- String filePath = output.getValue();
- filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
- inputValues.add(filePath);
- }
- }
- }
- }
-
+ List<String> inputValues = getProcessInputValues(processModel.getProcessInputs());
+ inputValues.addAll(getProcessOutputValues(processModel.getProcessOutputs()));
jobDescriptor.setInputValues(inputValues);
+
jobDescriptor.setUserName(processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setAllEnvExport(true);
@@ -562,14 +457,11 @@ public class GFacUtils {
if (scheduling.getQueueName() != null) {
jobDescriptor.setQueueName(scheduling.getQueueName());
}
-
if (totalNodeCount > 0) {
jobDescriptor.setNodes(totalNodeCount);
}
-
- if (scheduling.getQueueName() != null) {
- jobDescriptor.setQueueName(scheduling.getQueueName());
- }
+ // qos per queue
+ jobDescriptor.setQoS(getQoS(crp.getQualityOfService(), scheduling.getQueueName()));
if (totalCPUCount > 0) {
int ppn = totalCPUCount / totalNodeCount;
jobDescriptor.setProcessesPerNode(ppn);
@@ -636,7 +528,140 @@ public class GFacUtils {
return jobDescriptor;
}
+ private static void setMailAddresses(ProcessContext processContext, JobDescriptor jobDescriptor)
+ throws GFacException, AppCatalogException, ApplicationSettingsException {
+
+ ProcessModel processModel = processContext.getProcessModel();
+ String emailIds = null;
+ if (isEmailBasedJobMonitor(processContext)) {
+ emailIds = ServerSettings.getEmailBasedMonitorAddress();
+ }
+ if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+ String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+ if (flags != null && processContext.getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+ flags = "ALL";
+ }
+ jobDescriptor.setMailOptions(flags);
+
+ String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+ if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds += ("," + userJobNotifEmailIds);
+ } else {
+ emailIds = userJobNotifEmailIds;
+ }
+ }
+ if (processModel.isEnableEmailNotification()) {
+ List<String> emailList = processModel.getEmailAddresses();
+ String elist = GFacUtils.listToCsv(emailList, ',');
+ if (elist != null && !elist.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds = emailIds + "," + elist;
+ } else {
+ emailIds = elist;
+ }
+ }
+ }
+ }
+ if (emailIds != null && !emailIds.isEmpty()) {
+ log.info("Email list: " + emailIds);
+ jobDescriptor.setMailAddress(emailIds);
+ }
+ }
+
+ private static String getReservation(ComputeResourcePreference crp) {
+ long start = crp.getReservationStartTime();
+ long end = crp.getReservationEndTime();
+ String reservation = null;
+ if (start > 0 && start < end) {
+ long now = Calendar.getInstance().getTimeInMillis();
+ if (now > start && now < end) {
+ reservation = crp.getReservation();
+ }
+ } else {
+ reservation = crp.getReservation();
+ }
+ return reservation;
+ }
+
+ private static List<String> getProcessOutputValues(List<OutputDataObjectType> processOutputs) {
+ List<String> inputValues = new ArrayList<>();
+ if (processOutputs != null) {
+ for (OutputDataObjectType output : processOutputs) {
+ if (output.getApplicationArgument() != null
+ && !output.getApplicationArgument().equals("")) {
+ inputValues.add(output.getApplicationArgument());
+ }
+ if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+ if (output.getType() == DataType.URI) {
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }
+ }
+ return inputValues;
+ }
+
+ private static List<String> getProcessInputValues(List<InputDataObjectType> processInputs) {
+ List<String> inputValues = new ArrayList<String>();
+ if (processInputs != null) {
+
+ // sort the inputs first and then build the command ListR
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (InputDataObjectType input : processInputs) {
+ sortedInputSet.add(input);
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+ continue;
+ }
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ inputValues.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ if (inputDataObjectType.getType() == DataType.URI) {
+ // set only the relative path
+ String filePath = inputDataObjectType.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ } else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) {
+ String filePaths = inputDataObjectType.getValue();
+ String[] paths = filePaths.split(GFacConstants.MULTIPLE_INPUTS_SPLITTER);
+ String filePath;
+ String inputs = "";
+ int i = 0;
+ for (; i < paths.length - 1; i++) {
+ filePath = paths[i];
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ // File names separate by a space
+ inputs += filePath + " ";
+ }
+ inputs += paths[i];
+ inputValues.add(inputs);
+ } else {
+ inputValues.add(inputDataObjectType.getValue());
+ }
+
+ }
+ }
+ }
+ return inputValues;
+ }
+
private static String getQoS(String qualityOfService, String preferredBatchQueue) {
+ if(preferredBatchQueue == null || preferredBatchQueue.isEmpty()
+ || qualityOfService == null || qualityOfService.isEmpty()) return null;
final String qos = "qos";
Pattern pattern = Pattern.compile(preferredBatchQueue + "=(?<" + qos + ">[^,]*)");
Matcher matcher = pattern.matcher(qualityOfService);