You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/16 23:06:19 UTC

airavata git commit: job submission task impl

Repository: airavata
Updated Branches:
  refs/heads/master d05c0a166 -> 2bb805a02


job submission task impl


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2bb805a0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2bb805a0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2bb805a0

Branch: refs/heads/master
Commit: 2bb805a02fdb4f39fe05d133963822b21648e359
Parents: d05c0a1
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jun 16 17:06:12 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jun 16 17:06:12 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/common/utils/Constants.java |   5 +
 .../apache/airavata/gfac/core/GFacUtils.java    | 506 ++++++++++---------
 .../org/apache/airavata/gfac/impl/Factory.java  |   1 -
 .../gfac/impl/task/JobSubmissionTaskImpl.java   |  47 +-
 4 files changed, 319 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 6e1cb84..83f0cc5 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -36,4 +36,9 @@ public final class Constants {
     public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server";
     public static final String ADMIN_USERNAME = "admin.user.name";
     public static final String ADMIN_PASSWORD = "admin.password";
+
+    public static final String PBS_JOB_MANAGER = "pbs";
+    public static final String SLURM_JOB_MANAGER = "slurm";
+    public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+    public static final String LSF_JOB_MANAGER = "LSF";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 7fe289c..63cc093 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -42,6 +42,7 @@ import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -56,53 +57,58 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+import javax.xml.transform.*;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
 import javax.xml.xpath.*;
 import java.io.*;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.security.SecureRandom;
 import java.util.*;
 
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 
 public class GFacUtils {
-	private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
-	public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-
-	private GFacUtils() {
-	}
-
-
-	/**
-	 * Read data from inputStream and convert it to String.
-	 * 
-	 * @param in
-	 * @return String read from inputStream
-	 * @throws java.io.IOException
-	 */
-	public static String readFromStream(InputStream in) throws IOException {
-		try {
-			StringBuffer wsdlStr = new StringBuffer();
-
-			int read;
-
-			byte[] buf = new byte[1024];
-			while ((read = in.read(buf)) > 0) {
-				wsdlStr.append(new String(buf, 0, read));
-			}
-			return wsdlStr.toString();
-		} finally {
-			if (in != null) {
-				try {
-					in.close();
-				} catch (IOException e) {
-					log.warn("Cannot close InputStream: "
-							+ in.getClass().getName(), e);
-				}
-			}
-		}
-	}
+    private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+    public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private GFacUtils() {
+    }
+
+
+    /**
+     * Read data from inputStream and convert it to String.
+     *
+     * @param in
+     * @return String read from inputStream
+     * @throws java.io.IOException
+     */
+    public static String readFromStream(InputStream in) throws IOException {
+        try {
+            StringBuffer wsdlStr = new StringBuffer();
+
+            int read;
+
+            byte[] buf = new byte[1024];
+            while ((read = in.read(buf)) > 0) {
+                wsdlStr.append(new String(buf, 0, read));
+            }
+            return wsdlStr.toString();
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    log.warn("Cannot close InputStream: "
+                            + in.getClass().getName(), e);
+                }
+            }
+        }
+    }
 
 //	/**
 //	 * This returns true if the give job is finished
@@ -119,30 +125,31 @@ public class GFacUtils {
 //		}
 //	}
 
-	/**
-	 * This will read
-	 *
-	 * @param maxWalltime
-	 * @return
-	 */
-	public static String maxWallTimeCalculator(int maxWalltime) {
-		if (maxWalltime < 60) {
-			return "00:" + maxWalltime + ":00";
-		} else {
-			int minutes = maxWalltime % 60;
-			int hours = maxWalltime / 60;
-			return hours + ":" + minutes + ":00";
-		}
-	}
-	public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
-		if (maxWalltime < 60) {
-			return "00:" + maxWalltime;
-		} else {
-			int minutes = maxWalltime % 60;
-			int hours = maxWalltime / 60;
-			return hours + ":" + minutes;
-		}
-	}
+    /**
+     * This will read
+     *
+     * @param maxWalltime
+     * @return
+     */
+    public static String maxWallTimeCalculator(int maxWalltime) {
+        if (maxWalltime < 60) {
+            return "00:" + maxWalltime + ":00";
+        } else {
+            int minutes = maxWalltime % 60;
+            int hours = maxWalltime / 60;
+            return hours + ":" + minutes + ":00";
+        }
+    }
+
+    public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
+        if (maxWalltime < 60) {
+            return "00:" + maxWalltime;
+        } else {
+            int minutes = maxWalltime % 60;
+            int hours = maxWalltime / 60;
+            return hours + ":" + minutes;
+        }
+    }
 
 //	public static boolean isSynchronousMode(
 //			JobExecutionContext jobExecutionContext) {
@@ -155,44 +162,44 @@ public class GFacUtils {
 //		return true;
 //	}
 
-	public static String readFileToString(String file)
-			throws FileNotFoundException, IOException {
-		BufferedReader instream = null;
-		try {
-
-			instream = new BufferedReader(new FileReader(file));
-			StringBuffer buff = new StringBuffer();
-			String temp = null;
-			while ((temp = instream.readLine()) != null) {
-				buff.append(temp);
-				buff.append(GFacConstants.NEWLINE);
-			}
-			return buff.toString();
-		} finally {
-			if (instream != null) {
-				try {
-					instream.close();
-				} catch (IOException e) {
-					log.warn("Cannot close FileinputStream", e);
-				}
-			}
-		}
-	}
-
-	public static boolean isLocalHost(String appHost)
-			throws UnknownHostException {
-		String localHost = InetAddress.getLocalHost().getCanonicalHostName();
-		return (localHost.equals(appHost)
-				|| GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1
-					.equals(appHost));
-	}
-
-	public static String createUniqueNameWithDate(String name) {
-		String date = new Date().toString();
-		date = date.replaceAll(" ", "_");
-		date = date.replaceAll(":", "_");
-		return name + "_" + date;
-	}
+    public static String readFileToString(String file)
+            throws FileNotFoundException, IOException {
+        BufferedReader instream = null;
+        try {
+
+            instream = new BufferedReader(new FileReader(file));
+            StringBuffer buff = new StringBuffer();
+            String temp = null;
+            while ((temp = instream.readLine()) != null) {
+                buff.append(temp);
+                buff.append(GFacConstants.NEWLINE);
+            }
+            return buff.toString();
+        } finally {
+            if (instream != null) {
+                try {
+                    instream.close();
+                } catch (IOException e) {
+                    log.warn("Cannot close FileinputStream", e);
+                }
+            }
+        }
+    }
+
+    public static boolean isLocalHost(String appHost)
+            throws UnknownHostException {
+        String localHost = InetAddress.getLocalHost().getCanonicalHostName();
+        return (localHost.equals(appHost)
+                || GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1
+                .equals(appHost));
+    }
+
+    public static String createUniqueNameWithDate(String name) {
+        String date = new Date().toString();
+        date = date.replaceAll(" ", "_");
+        date = date.replaceAll(":", "_");
+        return name + "_" + date;
+    }
 
     public static List<Element> getElementList(Document doc, String expression) throws XPathExpressionException {
         XPathFactory xPathFactory = XPathFactory.newInstance();
@@ -209,17 +216,17 @@ public class GFacUtils {
         return elementList;
     }
 
-	public static String createGsiftpURIAsString(String host, String localPath)
-			throws URISyntaxException {
-		StringBuffer buf = new StringBuffer();
-		if (!host.startsWith("gsiftp://"))
-			buf.append("gsiftp://");
-		buf.append(host);
-		if (!host.endsWith("/"))
-			buf.append("/");
-		buf.append(localPath);
-		return buf.toString();
-	}
+    public static String createGsiftpURIAsString(String host, String localPath)
+            throws URISyntaxException {
+        StringBuffer buf = new StringBuffer();
+        if (!host.startsWith("gsiftp://"))
+            buf.append("gsiftp://");
+        buf.append(host);
+        if (!host.endsWith("/"))
+            buf.append("/");
+        buf.append(localPath);
+        return buf.toString();
+    }
 
 //	public static void saveJobStatus(JobExecutionContext jobExecutionContext,
 //                                     JobDetails details, JobState state) throws GFacException {
@@ -446,9 +453,9 @@ public class GFacUtils {
     private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
         Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
         if (exists != null) {
-			ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
-		}
-	}
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+        }
+    }
 
     private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
         for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
@@ -456,39 +463,39 @@ public class GFacUtils {
             Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
             String newChildPath = newPath + File.separator + childNode;
             log.info("Creating new znode: " + newChildPath);
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
-			if (--depth > 0) {
-                copyChildren(curatorClient , oldChildPath, newChildPath, depth );
-            }
-        }
-    }
-
-	/**
-	 * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
-	 * through gfac-server nodes
-	 *
-	 * @param experimentID
-	 * @param curatorClient
-	 * @return
-	 * @throws KeeperException
-	 * @throws InterruptedException
-	 */
-	public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
-		String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
-		List<String> children = curatorClient.getChildren().forPath(experimentNode);
-		for (String pickedChild : children) {
-			String experimentPath = experimentNode + File.separator + pickedChild;
-			String newExpNode = experimentPath + File.separator + experimentID;
-			Stat exists = curatorClient.checkExists().forPath(newExpNode);
-			if (exists == null) {
-				continue;
-			} else {
-				return newExpNode;
-			}
-		}
-		return null;
-	}
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+                    .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+            if (--depth > 0) {
+                copyChildren(curatorClient, oldChildPath, newChildPath, depth);
+            }
+        }
+    }
+
+    /**
+     * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
+     * through gfac-server nodes
+     *
+     * @param experimentID
+     * @param curatorClient
+     * @return
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
+        String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+        List<String> children = curatorClient.getChildren().forPath(experimentNode);
+        for (String pickedChild : children) {
+            String experimentPath = experimentNode + File.separator + pickedChild;
+            String newExpNode = experimentPath + File.separator + experimentID;
+            Stat exists = curatorClient.checkExists().forPath(newExpNode);
+            if (exists == null) {
+                continue;
+            } else {
+                return newExpNode;
+            }
+        }
+        return null;
+    }
 
     public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
@@ -505,25 +512,26 @@ public class GFacUtils {
                 return false;
             }
 
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
-			return true;
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+                    .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+            return true;
         }
 
     }
-    public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
-		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
-        if(experimentEntry == null){
+
+    public static boolean isCancelled(String experimentID, CuratorFramework curatorClient) throws Exception {
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+        if (experimentEntry == null) {
             return false;
-        }else {
+        } else {
             Stat exists = curatorClient.checkExists().forPath(experimentEntry);
             if (exists != null) {
-				String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
-				if ("cancel".equals(operation)) {
-					return true;
-				}
-			}
-		}
+                String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+                if ("cancel".equals(operation)) {
+                    return true;
+                }
+            }
+        }
         return false;
     }
 
@@ -548,62 +556,62 @@ public class GFacUtils {
 //		}
 //	}
 
-	public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
-		CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
-		if (curatorClient != null) {
-			String expZnodeHandlerPath = AiravataZKUtils
-					.getExpZnodeHandlerPath(
-							jobExecutionContext.getExperimentID(),
-							className);
-			Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
-			return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
-		}
-		return null;
-	}
-
-	public static CredentialReader getCredentialReader()
-			throws ApplicationSettingsException, IllegalAccessException,
-			InstantiationException {
-		try{
-		String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
-		String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
-		String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
-		String driver = ServerSettings.getCredentialStoreDBDriver();
-		return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
-				driver));
-		}catch(ClassNotFoundException e){
-			log.error("Not able to find driver: " + e.getLocalizedMessage());
-			return null;	
-		}
-	}
-
-    public static LOCALSubmission getLocalJobSubmission (String submissionId) throws AppCatalogException{
+    public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+        CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+        if (curatorClient != null) {
+            String expZnodeHandlerPath = AiravataZKUtils
+                    .getExpZnodeHandlerPath(
+                            jobExecutionContext.getExperimentID(),
+                            className);
+            Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+            return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
+        }
+        return null;
+    }
+
+    public static CredentialReader getCredentialReader()
+            throws ApplicationSettingsException, IllegalAccessException,
+            InstantiationException {
+        try {
+            String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+            String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+            String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+            String driver = ServerSettings.getCredentialStoreDBDriver();
+            return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+                    driver));
+        } catch (ClassNotFoundException e) {
+            log.error("Not able to find driver: " + e.getLocalizedMessage());
+            return null;
+        }
+    }
+
+    public static LOCALSubmission getLocalJobSubmission(String submissionId) throws AppCatalogException {
         try {
             AppCatalog appCatalog = RegistryFactory.getAppCatalog();
             return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
-        }catch (Exception e){
+        } catch (Exception e) {
             String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
             log.error(errorMsg, e);
             throw new AppCatalogException(errorMsg, e);
         }
     }
 
-    public static UnicoreJobSubmission getUnicoreJobSubmission (String submissionId) throws AppCatalogException{
+    public static UnicoreJobSubmission getUnicoreJobSubmission(String submissionId) throws AppCatalogException {
         try {
             AppCatalog appCatalog = RegistryFactory.getAppCatalog();
             return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
-        }catch (Exception e){
+        } catch (Exception e) {
             String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
             log.error(errorMsg, e);
             throw new AppCatalogException(errorMsg, e);
         }
     }
 
-    public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
+    public static SSHJobSubmission getSSHJobSubmission(String submissionId) throws AppCatalogException {
         try {
             AppCatalog appCatalog = RegistryFactory.getAppCatalog();
             return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
-        }catch (Exception e){
+        } catch (Exception e) {
             String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
             log.error(errorMsg, e);
             throw new AppCatalogException(errorMsg, e);
@@ -612,39 +620,40 @@ public class GFacUtils {
 
     /**
      * To convert list to separated value
+     *
      * @param listOfStrings
      * @param separator
      * @return
      */
-    public static  String listToCsv(List<String> listOfStrings, char separator) {
+    public static String listToCsv(List<String> listOfStrings, char separator) {
         StringBuilder sb = new StringBuilder();
 
         // all but last
-        for(int i = 0; i < listOfStrings.size() - 1 ; i++) {
+        for (int i = 0; i < listOfStrings.size() - 1; i++) {
             sb.append(listOfStrings.get(i));
             sb.append(separator);
         }
 
         // last string, no separator
-        if(listOfStrings.size() > 0){
-            sb.append(listOfStrings.get(listOfStrings.size()-1));
+        if (listOfStrings.size() > 0) {
+            sb.append(listOfStrings.get(listOfStrings.size() - 1));
         }
 
         return sb.toString();
     }
 
-	public static byte[] longToBytes(long x) {
-		ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
-		buffer.putLong(x);
-		return buffer.array();
-	}
+    public static byte[] longToBytes(long x) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(x);
+        return buffer.array();
+    }
 
-	public static long bytesToLong(byte[] bytes) {
-		ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
-		buffer.put(bytes);
-		buffer.flip();//need flip
-		return buffer.getLong();
-	}
+    public static long bytesToLong(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.put(bytes);
+        buffer.flip();//need flip
+        return buffer.getLong();
+    }
 
     public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
         ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog();
@@ -668,7 +677,7 @@ public class GFacUtils {
         return details.getExperimentStatus().getState();
     }
 
-    public static boolean isFailedJob (JobExecutionContext jec) {
+    public static boolean isFailedJob(JobExecutionContext jec) {
 //        JobStatus jobStatus = jec.getJobDetails().getJobStatus();
 //        if (jobStatus.getJobState() == JobState.FAILED) {
 //            return true;
@@ -687,10 +696,10 @@ public class GFacUtils {
             // check cancel operation is being processed for the same experiment.
             Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
             if (cancelState != null) {
-				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
-				return true;
-			}
-		}
+                ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+                return true;
+            }
+        }
         return false;
     }
 
@@ -702,16 +711,16 @@ public class GFacUtils {
 //        publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
 //    }
 
-	public static String getZKGfacServersParentPath() {
-		return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
-	}
+    public static String getZKGfacServersParentPath() {
+        return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
+    }
 
-    public static JobDescriptor createJobDescriptor (ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
+    public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
         JobDescriptor jobDescriptor = new JobDescriptor();
         String emailIds = null;
         ProcessModel processModel = processContext.getProcessModel();
         ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
-        if (isEmailBasedJobMonitor(processContext)){
+        if (isEmailBasedJobMonitor(processContext)) {
             emailIds = ServerSettings.getEmailBasedMonitorAddress();
         }
         if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
@@ -771,7 +780,7 @@ public class GFacUtils {
         };
         Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
         for (InputDataObjectType input : processInputs) {
-                sortedInputSet.add(input);
+            sortedInputSet.add(input);
         }
         for (InputDataObjectType inputDataObjectType : sortedInputSet) {
             if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
@@ -789,7 +798,7 @@ public class GFacUtils {
                     String filePath = inputDataObjectType.getValue();
                     filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
                     inputValues.add(filePath);
-                }else {
+                } else {
                     inputValues.add(inputDataObjectType.getValue());
                 }
 
@@ -840,8 +849,8 @@ public class GFacUtils {
             }
             if (scheduling.getWallTimeLimit() > 0) {
                 jobDescriptor.setMaxWallTime(String.valueOf(scheduling.getWallTimeLimit()));
-                if (resourceJobManager != null){
-                    if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+                if (resourceJobManager != null) {
+                    if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
                         jobDescriptor.setMaxWallTimeForLSF(String.valueOf(scheduling.getWallTimeLimit()));
                     }
                 }
@@ -874,9 +883,9 @@ public class GFacUtils {
         }
 
         ApplicationParallelismType parallelism = appDepDescription.getParallelism();
-        if (parallelism != null){
-            if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
-                if (resourceJobManager != null){
+        if (parallelism != null) {
+            if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI) {
+                if (resourceJobManager != null) {
                     Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
                     if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
                         for (JobManagerCommand command : jobManagerCommands.keySet()) {
@@ -896,7 +905,7 @@ public class GFacUtils {
         Random random = new Random();
         int i = random.nextInt(Integer.MAX_VALUE);
         i = i + 99999999;
-        if(i<0) {
+        if (i < 0) {
             i = i * (-1);
         }
         return i;
@@ -943,7 +952,7 @@ public class GFacUtils {
         }
     }
 
-    public static JobSubmissionInterface getPreferredJobSubmissionInterface (ProcessContext context) throws AppCatalogException {
+    public static JobSubmissionInterface getPreferredJobSubmissionInterface(ProcessContext context) throws AppCatalogException {
         try {
             String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
             ComputeResourceDescription resourceDescription = context.getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
@@ -959,12 +968,12 @@ public class GFacUtils {
                 throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
             }
             return jobSubmissionInterfaces.get(0);
-        }catch (AppCatalogException e){
+        } catch (AppCatalogException e) {
             throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
         }
     }
 
-    public static JobSubmissionProtocol getPreferredJobSubmissionProtocol (ProcessContext context) throws AppCatalogException{
+    public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(ProcessContext context) throws AppCatalogException {
         try {
             GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
             String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
@@ -976,7 +985,7 @@ public class GFacUtils {
         }
     }
 
-    public static  ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
+    public static ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
         try {
             GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
             String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
@@ -987,4 +996,41 @@ public class GFacUtils {
         }
     }
 
+    public static File createJobFile(JobDescriptor jobDescriptor, JobManagerConfiguration jobManagerConfiguration) throws GFacException {
+        try {
+            TransformerFactory factory = TransformerFactory.newInstance();
+            URL resource = GFacUtils.class.getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
+
+            if (resource == null) {
+                String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName()
+                        + "' not found in the classpath";
+                throw new GFacException(error);
+            }
+
+            Source xslt = new StreamSource(new File(resource.getPath()));
+            Transformer transformer;
+            StringWriter results = new StringWriter();
+            File tempPBSFile = null;
+            // generate the pbs script using xslt
+            transformer = factory.newTransformer(xslt);
+            Source text = new StreamSource(new ByteArrayInputStream(jobDescriptor.toXML().getBytes()));
+            transformer.transform(text, new StreamResult(results));
+            String scriptContent = results.toString().replaceAll("^[ |\t]*\n$", "");
+            if (scriptContent.startsWith("\n")) {
+                scriptContent = scriptContent.substring(1);
+            }
+            // creating a temporary file using pbs script generated above
+            int number = new SecureRandom().nextInt();
+            number = (number < 0 ? -number : number);
+            tempPBSFile = new File(Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
+            FileUtils.writeStringToFile(tempPBSFile, scriptContent);
+            return tempPBSFile;
+        } catch (IOException e) {
+            throw new GFacException("Error occurred while creating the temp job script file", e);
+        } catch (TransformerConfigurationException e) {
+            throw new GFacException("Error occurred while creating the temp job script file", e);
+        } catch (TransformerException e) {
+            throw new GFacException("Error occurred while creating the temp job script file", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a0d3a9b..1ca7f95 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -40,7 +40,6 @@ import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
 import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.job.UGEOutputParser;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.AppCatalog;

http://git-wip-us.apache.org/repos/asf/airavata/blob/2bb805a0/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
index 2696236..7d65f18 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
@@ -22,22 +22,26 @@
 package org.apache.airavata.gfac.impl.task;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.JobDescriptor;
+import org.apache.airavata.gfac.core.*;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Map;
 
 public class JobSubmissionTaskImpl implements JobSubmissionTask {
@@ -51,12 +55,36 @@ public class JobSubmissionTaskImpl implements JobSubmissionTask {
     public TaskState execute(TaskContext taskContext) throws TaskException {
         try {
             ProcessContext processContext = taskContext.getParentProcessContext();
-            AppCatalog appCatalog = processContext.getAppCatalog();
-            String resourceHostId = processContext.getProcessModel().getResourceSchedule().getResourceHostId();
-            ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
-            LocalEventPublisher publisher = processContext.getLocalEventPublisher();
             RemoteCluster remoteCluster = processContext.getRemoteCluster();
             JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+            JobManagerConfiguration jConfig = null;
+            if (resourceJobManager != null){
+                String installedParentPath = resourceJobManager.getJobManagerBinPath();
+                if (installedParentPath == null) {
+                    installedParentPath = "/";
+                }
+                ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
+                if (resourceJobManagerType == null) {
+                    log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+                    jConfig = Factory.getPBSJobManager(installedParentPath);
+                } else {
+                    if (Constants.PBS_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+                        jConfig = Factory.getPBSJobManager(installedParentPath);
+                    } else if (Constants.SLURM_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+                        jConfig = Factory.getSLURMJobManager(installedParentPath);
+                    } else if (Constants.SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
+                        jConfig = Factory.getUGEJobManager(installedParentPath);
+                    } else if (Constants.LSF_JOB_MANAGER.equals(resourceJobManagerType.toString())) {
+                        jConfig = Factory.getLSFJobManager(installedParentPath);
+                    }
+                }
+            }
+            File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
+            if (jobFile != null && jobFile.exists()){
+                String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+            }
+
         } catch (AppCatalogException e) {
             log.error("Error while instatiating app catalog",e);
             throw new TaskException("Error while instatiating app catalog", e);
@@ -66,9 +94,10 @@ public class JobSubmissionTaskImpl implements JobSubmissionTask {
         } catch (GFacException e) {
             log.error("Error occurred while creating job descriptor", e);
             throw new TaskException("Error occurred while creating job descriptor", e);
+        } catch (SSHApiException e) {
+            log.error("Error occurred while submitting the job", e);
+            throw new TaskException("Error occurred while submitting the job", e);
         }
-
-
         return null;
     }