You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/16 23:06:19 UTC
airavata git commit: job submission task impl
Repository: airavata
Updated Branches:
refs/heads/master d05c0a166 -> 2bb805a02
job submission task impl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2bb805a0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2bb805a0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2bb805a0
Branch: refs/heads/master
Commit: 2bb805a02fdb4f39fe05d133963822b21648e359
Parents: d05c0a1
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jun 16 17:06:12 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jun 16 17:06:12 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/common/utils/Constants.java | 5 +
.../apache/airavata/gfac/core/GFacUtils.java | 506 ++++++++++---------
.../org/apache/airavata/gfac/impl/Factory.java | 1 -
.../gfac/impl/task/JobSubmissionTaskImpl.java | 47 +-
4 files changed, 319 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 6e1cb84..83f0cc5 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -36,4 +36,9 @@ public final class Constants {
public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server";
public static final String ADMIN_USERNAME = "admin.user.name";
public static final String ADMIN_PASSWORD = "admin.password";
+
+ public static final String PBS_JOB_MANAGER = "pbs";
+ public static final String SLURM_JOB_MANAGER = "slurm";
+ public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+ public static final String LSF_JOB_MANAGER = "LSF";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 7fe289c..63cc093 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -42,6 +42,7 @@ import org.apache.airavata.model.status.ExperimentState;
import org.apache.airavata.model.status.ExperimentStatus;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
+import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
@@ -56,53 +57,58 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import javax.xml.transform.*;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
import javax.xml.xpath.*;
import java.io.*;
import java.net.InetAddress;
import java.net.URISyntaxException;
+import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.security.SecureRandom;
import java.util.*;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
public class GFacUtils {
- private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
- public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-
- private GFacUtils() {
- }
-
-
- /**
- * Read data from inputStream and convert it to String.
- *
- * @param in
- * @return String read from inputStream
- * @throws java.io.IOException
- */
- public static String readFromStream(InputStream in) throws IOException {
- try {
- StringBuffer wsdlStr = new StringBuffer();
-
- int read;
-
- byte[] buf = new byte[1024];
- while ((read = in.read(buf)) > 0) {
- wsdlStr.append(new String(buf, 0, read));
- }
- return wsdlStr.toString();
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- log.warn("Cannot close InputStream: "
- + in.getClass().getName(), e);
- }
- }
- }
- }
+ private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private GFacUtils() {
+ }
+
+
+ /**
+ * Read data from inputStream and convert it to String.
+ *
+ * @param in
+ * @return String read from inputStream
+ * @throws java.io.IOException
+ */
+ public static String readFromStream(InputStream in) throws IOException {
+ try {
+ StringBuffer wsdlStr = new StringBuffer();
+
+ int read;
+
+ byte[] buf = new byte[1024];
+ while ((read = in.read(buf)) > 0) {
+ wsdlStr.append(new String(buf, 0, read));
+ }
+ return wsdlStr.toString();
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ log.warn("Cannot close InputStream: "
+ + in.getClass().getName(), e);
+ }
+ }
+ }
+ }
// /**
// * This returns true if the give job is finished
@@ -119,30 +125,31 @@ public class GFacUtils {
// }
// }
- /**
- * This will read
- *
- * @param maxWalltime
- * @return
- */
- public static String maxWallTimeCalculator(int maxWalltime) {
- if (maxWalltime < 60) {
- return "00:" + maxWalltime + ":00";
- } else {
- int minutes = maxWalltime % 60;
- int hours = maxWalltime / 60;
- return hours + ":" + minutes + ":00";
- }
- }
- public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
- if (maxWalltime < 60) {
- return "00:" + maxWalltime;
- } else {
- int minutes = maxWalltime % 60;
- int hours = maxWalltime / 60;
- return hours + ":" + minutes;
- }
- }
+ /**
+ * This will read
+ *
+ * @param maxWalltime
+ * @return
+ */
+ public static String maxWallTimeCalculator(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime + ":00";
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes + ":00";
+ }
+ }
+
+ public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime;
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes;
+ }
+ }
// public static boolean isSynchronousMode(
// JobExecutionContext jobExecutionContext) {
@@ -155,44 +162,44 @@ public class GFacUtils {
// return true;
// }
- public static String readFileToString(String file)
- throws FileNotFoundException, IOException {
- BufferedReader instream = null;
- try {
-
- instream = new BufferedReader(new FileReader(file));
- StringBuffer buff = new StringBuffer();
- String temp = null;
- while ((temp = instream.readLine()) != null) {
- buff.append(temp);
- buff.append(GFacConstants.NEWLINE);
- }
- return buff.toString();
- } finally {
- if (instream != null) {
- try {
- instream.close();
- } catch (IOException e) {
- log.warn("Cannot close FileinputStream", e);
- }
- }
- }
- }
-
- public static boolean isLocalHost(String appHost)
- throws UnknownHostException {
- String localHost = InetAddress.getLocalHost().getCanonicalHostName();
- return (localHost.equals(appHost)
- || GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1
- .equals(appHost));
- }
-
- public static String createUniqueNameWithDate(String name) {
- String date = new Date().toString();
- date = date.replaceAll(" ", "_");
- date = date.replaceAll(":", "_");
- return name + "_" + date;
- }
+ public static String readFileToString(String file)
+ throws FileNotFoundException, IOException {
+ BufferedReader instream = null;
+ try {
+
+ instream = new BufferedReader(new FileReader(file));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(GFacConstants.NEWLINE);
+ }
+ return buff.toString();
+ } finally {
+ if (instream != null) {
+ try {
+ instream.close();
+ } catch (IOException e) {
+ log.warn("Cannot close FileinputStream", e);
+ }
+ }
+ }
+ }
+
+ public static boolean isLocalHost(String appHost)
+ throws UnknownHostException {
+ String localHost = InetAddress.getLocalHost().getCanonicalHostName();
+ return (localHost.equals(appHost)
+ || GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1
+ .equals(appHost));
+ }
+
+ public static String createUniqueNameWithDate(String name) {
+ String date = new Date().toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+ return name + "_" + date;
+ }
public static List<Element> getElementList(Document doc, String expression) throws XPathExpressionException {
XPathFactory xPathFactory = XPathFactory.newInstance();
@@ -209,17 +216,17 @@ public class GFacUtils {
return elementList;
}
- public static String createGsiftpURIAsString(String host, String localPath)
- throws URISyntaxException {
- StringBuffer buf = new StringBuffer();
- if (!host.startsWith("gsiftp://"))
- buf.append("gsiftp://");
- buf.append(host);
- if (!host.endsWith("/"))
- buf.append("/");
- buf.append(localPath);
- return buf.toString();
- }
+ public static String createGsiftpURIAsString(String host, String localPath)
+ throws URISyntaxException {
+ StringBuffer buf = new StringBuffer();
+ if (!host.startsWith("gsiftp://"))
+ buf.append("gsiftp://");
+ buf.append(host);
+ if (!host.endsWith("/"))
+ buf.append("/");
+ buf.append(localPath);
+ return buf.toString();
+ }
// public static void saveJobStatus(JobExecutionContext jobExecutionContext,
// JobDetails details, JobState state) throws GFacException {
@@ -446,9 +453,9 @@ public class GFacUtils {
private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (exists != null) {
- ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+ }
+ }
private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
@@ -456,39 +463,39 @@ public class GFacUtils {
Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
String newChildPath = newPath + File.separator + childNode;
log.info("Creating new znode: " + newChildPath);
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
- if (--depth > 0) {
- copyChildren(curatorClient , oldChildPath, newChildPath, depth );
- }
- }
- }
-
- /**
- * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
- * through gfac-server nodes
- *
- * @param experimentID
- * @param curatorClient
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
- String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
- List<String> children = curatorClient.getChildren().forPath(experimentNode);
- for (String pickedChild : children) {
- String experimentPath = experimentNode + File.separator + pickedChild;
- String newExpNode = experimentPath + File.separator + experimentID;
- Stat exists = curatorClient.checkExists().forPath(newExpNode);
- if (exists == null) {
- continue;
- } else {
- return newExpNode;
- }
- }
- return null;
- }
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+ if (--depth > 0) {
+ copyChildren(curatorClient, oldChildPath, newChildPath, depth);
+ }
+ }
+ }
+
+ /**
+ * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
+ * through gfac-server nodes
+ *
+ * @param experimentID
+ * @param curatorClient
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
+ String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+ List<String> children = curatorClient.getChildren().forPath(experimentNode);
+ for (String pickedChild : children) {
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String newExpNode = experimentPath + File.separator + experimentID;
+ Stat exists = curatorClient.checkExists().forPath(newExpNode);
+ if (exists == null) {
+ continue;
+ } else {
+ return newExpNode;
+ }
+ }
+ return null;
+ }
public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
@@ -505,25 +512,26 @@ public class GFacUtils {
return false;
}
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
- return true;
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+ return true;
}
}
- public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
- if(experimentEntry == null){
+
+ public static boolean isCancelled(String experimentID, CuratorFramework curatorClient) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+ if (experimentEntry == null) {
return false;
- }else {
+ } else {
Stat exists = curatorClient.checkExists().forPath(experimentEntry);
if (exists != null) {
- String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
- if ("cancel".equals(operation)) {
- return true;
- }
- }
- }
+ String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+ if ("cancel".equals(operation)) {
+ return true;
+ }
+ }
+ }
return false;
}
@@ -548,62 +556,62 @@ public class GFacUtils {
// }
// }
- public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
- CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
- if (curatorClient != null) {
- String expZnodeHandlerPath = AiravataZKUtils
- .getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(),
- className);
- Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
- return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
- }
- return null;
- }
-
- public static CredentialReader getCredentialReader()
- throws ApplicationSettingsException, IllegalAccessException,
- InstantiationException {
- try{
- String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
- String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
- String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
- String driver = ServerSettings.getCredentialStoreDBDriver();
- return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
- driver));
- }catch(ClassNotFoundException e){
- log.error("Not able to find driver: " + e.getLocalizedMessage());
- return null;
- }
- }
-
- public static LOCALSubmission getLocalJobSubmission (String submissionId) throws AppCatalogException{
+ public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
+ String expZnodeHandlerPath = AiravataZKUtils
+ .getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(),
+ className);
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
+ }
+ return null;
+ }
+
+ public static CredentialReader getCredentialReader()
+ throws ApplicationSettingsException, IllegalAccessException,
+ InstantiationException {
+ try {
+ String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+ String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+ String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+ String driver = ServerSettings.getCredentialStoreDBDriver();
+ return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+ driver));
+ } catch (ClassNotFoundException e) {
+ log.error("Not able to find driver: " + e.getLocalizedMessage());
+ return null;
+ }
+ }
+
+ public static LOCALSubmission getLocalJobSubmission(String submissionId) throws AppCatalogException {
try {
AppCatalog appCatalog = RegistryFactory.getAppCatalog();
return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
- }catch (Exception e){
+ } catch (Exception e) {
String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
log.error(errorMsg, e);
throw new AppCatalogException(errorMsg, e);
}
}
- public static UnicoreJobSubmission getUnicoreJobSubmission (String submissionId) throws AppCatalogException{
+ public static UnicoreJobSubmission getUnicoreJobSubmission(String submissionId) throws AppCatalogException {
try {
AppCatalog appCatalog = RegistryFactory.getAppCatalog();
return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
- }catch (Exception e){
+ } catch (Exception e) {
String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
log.error(errorMsg, e);
throw new AppCatalogException(errorMsg, e);
}
}
- public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
+ public static SSHJobSubmission getSSHJobSubmission(String submissionId) throws AppCatalogException {
try {
AppCatalog appCatalog = RegistryFactory.getAppCatalog();
return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
- }catch (Exception e){
+ } catch (Exception e) {
String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
log.error(errorMsg, e);
throw new AppCatalogException(errorMsg, e);
@@ -612,39 +620,40 @@ public class GFacUtils {
/**
* To convert list to separated value
+ *
* @param listOfStrings
* @param separator
* @return
*/
- public static String listToCsv(List<String> listOfStrings, char separator) {
+ public static String listToCsv(List<String> listOfStrings, char separator) {
StringBuilder sb = new StringBuilder();
// all but last
- for(int i = 0; i < listOfStrings.size() - 1 ; i++) {
+ for (int i = 0; i < listOfStrings.size() - 1; i++) {
sb.append(listOfStrings.get(i));
sb.append(separator);
}
// last string, no separator
- if(listOfStrings.size() > 0){
- sb.append(listOfStrings.get(listOfStrings.size()-1));
+ if (listOfStrings.size() > 0) {
+ sb.append(listOfStrings.get(listOfStrings.size() - 1));
}
return sb.toString();
}
- public static byte[] longToBytes(long x) {
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- buffer.putLong(x);
- return buffer.array();
- }
+ public static byte[] longToBytes(long x) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(x);
+ return buffer.array();
+ }
- public static long bytesToLong(byte[] bytes) {
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- buffer.put(bytes);
- buffer.flip();//need flip
- return buffer.getLong();
- }
+ public static long bytesToLong(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.put(bytes);
+ buffer.flip();//need flip
+ return buffer.getLong();
+ }
public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog();
@@ -668,7 +677,7 @@ public class GFacUtils {
return details.getExperimentStatus().getState();
}
- public static boolean isFailedJob (JobExecutionContext jec) {
+ public static boolean isFailedJob(JobExecutionContext jec) {
// JobStatus jobStatus = jec.getJobDetails().getJobStatus();
// if (jobStatus.getJobState() == JobState.FAILED) {
// return true;
@@ -687,10 +696,10 @@ public class GFacUtils {
// check cancel operation is being processed for the same experiment.
Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
if (cancelState != null) {
- ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
- return true;
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+ return true;
+ }
+ }
return false;
}
@@ -702,16 +711,16 @@ public class GFacUtils {
// publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
// }
- public static String getZKGfacServersParentPath() {
- return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
- }
+ public static String getZKGfacServersParentPath() {
+ return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
+ }
- public static JobDescriptor createJobDescriptor (ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
+ public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
JobDescriptor jobDescriptor = new JobDescriptor();
String emailIds = null;
ProcessModel processModel = processContext.getProcessModel();
ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
- if (isEmailBasedJobMonitor(processContext)){
+ if (isEmailBasedJobMonitor(processContext)) {
emailIds = ServerSettings.getEmailBasedMonitorAddress();
}
if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
@@ -771,7 +780,7 @@ public class GFacUtils {
};
Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
for (InputDataObjectType input : processInputs) {
- sortedInputSet.add(input);
+ sortedInputSet.add(input);
}
for (InputDataObjectType inputDataObjectType : sortedInputSet) {
if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
@@ -789,7 +798,7 @@ public class GFacUtils {
String filePath = inputDataObjectType.getValue();
filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
inputValues.add(filePath);
- }else {
+ } else {
inputValues.add(inputDataObjectType.getValue());
}
@@ -840,8 +849,8 @@ public class GFacUtils {
}
if (scheduling.getWallTimeLimit() > 0) {
jobDescriptor.setMaxWallTime(String.valueOf(scheduling.getWallTimeLimit()));
- if (resourceJobManager != null){
- if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+ if (resourceJobManager != null) {
+ if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
jobDescriptor.setMaxWallTimeForLSF(String.valueOf(scheduling.getWallTimeLimit()));
}
}
@@ -874,9 +883,9 @@ public class GFacUtils {
}
ApplicationParallelismType parallelism = appDepDescription.getParallelism();
- if (parallelism != null){
- if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
- if (resourceJobManager != null){
+ if (parallelism != null) {
+ if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI) {
+ if (resourceJobManager != null) {
Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
for (JobManagerCommand command : jobManagerCommands.keySet()) {
@@ -896,7 +905,7 @@ public class GFacUtils {
Random random = new Random();
int i = random.nextInt(Integer.MAX_VALUE);
i = i + 99999999;
- if(i<0) {
+ if (i < 0) {
i = i * (-1);
}
return i;
@@ -943,7 +952,7 @@ public class GFacUtils {
}
}
- public static JobSubmissionInterface getPreferredJobSubmissionInterface (ProcessContext context) throws AppCatalogException {
+ public static JobSubmissionInterface getPreferredJobSubmissionInterface(ProcessContext context) throws AppCatalogException {
try {
String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
ComputeResourceDescription resourceDescription = context.getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
@@ -959,12 +968,12 @@ public class GFacUtils {
throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
}
return jobSubmissionInterfaces.get(0);
- }catch (AppCatalogException e){
+ } catch (AppCatalogException e) {
throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
}
}
- public static JobSubmissionProtocol getPreferredJobSubmissionProtocol (ProcessContext context) throws AppCatalogException{
+ public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(ProcessContext context) throws AppCatalogException {
try {
GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
@@ -976,7 +985,7 @@ public class GFacUtils {
}
}
- public static ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
+ public static ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
try {
GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
@@ -987,4 +996,41 @@ public class GFacUtils {
}
}
+ public static File createJobFile(JobDescriptor jobDescriptor, JobManagerConfiguration jobManagerConfiguration) throws GFacException {
+ try {
+ TransformerFactory factory = TransformerFactory.newInstance();
+ URL resource = GFacUtils.class.getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
+
+ if (resource == null) {
+ String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName()
+ + "' not found in the classpath";
+ throw new GFacException(error);
+ }
+
+ Source xslt = new StreamSource(new File(resource.getPath()));
+ Transformer transformer;
+ StringWriter results = new StringWriter();
+ File tempPBSFile = null;
+ // generate the pbs script using xslt
+ transformer = factory.newTransformer(xslt);
+ Source text = new StreamSource(new ByteArrayInputStream(jobDescriptor.toXML().getBytes()));
+ transformer.transform(text, new StreamResult(results));
+ String scriptContent = results.toString().replaceAll("^[ |\t]*\n$", "");
+ if (scriptContent.startsWith("\n")) {
+ scriptContent = scriptContent.substring(1);
+ }
+ // creating a temporary file using pbs script generated above
+ int number = new SecureRandom().nextInt();
+ number = (number < 0 ? -number : number);
+ tempPBSFile = new File(Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
+ FileUtils.writeStringToFile(tempPBSFile, scriptContent);
+ return tempPBSFile;
+ } catch (IOException e) {
+ throw new GFacException("Error occurred while creating the temp job script file", e);
+ } catch (TransformerConfigurationException e) {
+ throw new GFacException("Error occurred while creating the temp job script file", e);
+ } catch (TransformerException e) {
+ throw new GFacException("Error occurred while creating the temp job script file", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a0d3a9b..1ca7f95 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -40,7 +40,6 @@ import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
import org.apache.airavata.gfac.impl.job.UGEOutputParser;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
index 2696236..7d65f18 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
@@ -22,22 +22,26 @@
package org.apache.airavata.gfac.impl.task;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.JobDescriptor;
+import org.apache.airavata.gfac.core.*;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Map;
public class JobSubmissionTaskImpl implements JobSubmissionTask {
@@ -51,12 +55,36 @@ public class JobSubmissionTaskImpl implements JobSubmissionTask {
public TaskState execute(TaskContext taskContext) throws TaskException {
try {
ProcessContext processContext = taskContext.getParentProcessContext();
- AppCatalog appCatalog = processContext.getAppCatalog();
- String resourceHostId = processContext.getProcessModel().getResourceSchedule().getResourceHostId();
- ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
- LocalEventPublisher publisher = processContext.getLocalEventPublisher();
RemoteCluster remoteCluster = processContext.getRemoteCluster();
JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null){
+ String installedParentPath = resourceJobManager.getJobManagerBinPath();
+ if (installedParentPath == null) {
+ installedParentPath = "/";
+ }
+ ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
+ if (resourceJobManagerType == null) {
+ log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = Factory.getPBSJobManager(installedParentPath);
+ } else {
+ if (Constants.PBS_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+ jConfig = Factory.getPBSJobManager(installedParentPath);
+ } else if (Constants.SLURM_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+ jConfig = Factory.getSLURMJobManager(installedParentPath);
+ } else if (Constants.SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+ jConfig = Factory.getUGEJobManager(installedParentPath);
+ } else if (Constants.LSF_JOB_MANAGER.equals(resourceJobManagerType.toString())) {
+ jConfig = Factory.getLSFJobManager(installedParentPath);
+ }
+ }
+ }
+ File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
+ if (jobFile != null && jobFile.exists()){
+ String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+ }
+
} catch (AppCatalogException e) {
log.error("Error while instatiating app catalog",e);
throw new TaskException("Error while instatiating app catalog", e);
@@ -66,9 +94,10 @@ public class JobSubmissionTaskImpl implements JobSubmissionTask {
} catch (GFacException e) {
log.error("Error occurred while creating job descriptor", e);
throw new TaskException("Error occurred while creating job descriptor", e);
+ } catch (SSHApiException e) {
+ log.error("Error occurred while submitting the job", e);
+ throw new TaskException("Error occurred while submitting the job", e);
}
-
-
return null;
}