You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/16 22:03:02 UTC

airavata git commit: updated process context and create job descriptor

Repository: airavata
Updated Branches:
  refs/heads/master d9b2df033 -> a2b6bdfd9


updated process context and create job descriptor


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a2b6bdfd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a2b6bdfd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a2b6bdfd

Branch: refs/heads/master
Commit: a2b6bdfd99ebc27724b5cc97fd589055f6785d89
Parents: d9b2df0
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jun 16 16:02:58 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jun 16 16:02:58 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 891 ++++++++++++-------
 .../gfac/core/context/ProcessContext.java       |  66 ++
 .../gfac/impl/task/JobSubmissionTaskImpl.java   |  79 ++
 3 files changed, 713 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index ae48ed7..7fe289c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -20,42 +20,28 @@
  */
 package org.apache.airavata.gfac.core;
 
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.experiment.ActionableGroup;
-import org.apache.airavata.model.experiment.CorrectiveAction;
-import org.apache.airavata.model.experiment.ErrorCategory;
-import org.apache.airavata.model.experiment.ErrorDetails;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.experiment.ExperimentState;
-import org.apache.airavata.model.experiment.JobDetails;
-import org.apache.airavata.model.experiment.JobState;
-import org.apache.airavata.model.experiment.JobStatus;
-import org.apache.airavata.model.experiment.TaskState;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.model.status.ExperimentStatus;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -70,27 +56,13 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import javax.xml.xpath.*;
+import java.io.*;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 
@@ -132,20 +104,20 @@ public class GFacUtils {
 		}
 	}
 
-	/**
-	 * This returns true if the give job is finished
-	 * otherwise false
-	 *
-	 * @param job
-	 * @return
-	 */
-	public static boolean isJobFinished(JobDescriptor job) {
-		if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
-			return true;
-		} else {
-			return false;
-		}
-	}
+//	/**
+//	 * This returns true if the give job is finished
+//	 * otherwise false
+//	 *
+//	 * @param job
+//	 * @return
+//	 */
+//	public static boolean isJobFinished(JobDescriptor job) {
+//		if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
+//			return true;
+//		} else {
+//			return false;
+//		}
+//	}
 
 	/**
 	 * This will read
@@ -171,22 +143,17 @@ public class GFacUtils {
 			return hours + ":" + minutes;
 		}
 	}
-	/**
-	 * this can be used to do framework opertaions specific to different modes
-	 * 
-	 * @param jobExecutionContext
-	 * @return
-	 */
-	public static boolean isSynchronousMode(
-			JobExecutionContext jobExecutionContext) {
-		GFacConfiguration gFacConfiguration = jobExecutionContext
-				.getGFacConfiguration();
-		if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
-				.getExecutionMode())) {
-			return false;
-		}
-		return true;
-	}
+
+//	public static boolean isSynchronousMode(
+//			JobExecutionContext jobExecutionContext) {
+//		GFacConfiguration gFacConfiguration = jobExecutionContext
+//				.getGFacConfiguration();
+//		if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
+//				.getExecutionMode())) {
+//			return false;
+//		}
+//		return true;
+//	}
 
 	public static String readFileToString(String file)
 			throws FileNotFoundException, IOException {
@@ -254,65 +221,61 @@ public class GFacUtils {
 		return buf.toString();
 	}
 
-	public static void saveJobStatus(JobExecutionContext jobExecutionContext,
-                                     JobDetails details, JobState state) throws GFacException {
-		try {
-            // first we save job details to the registry for sa and then save the job status.
-            ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-            JobStatus status = new JobStatus();
-            status.setJobState(state);
-            details.setJobStatus(status);
-            experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
-                    new CompositeIdentifier(jobExecutionContext.getTaskData()
-                            .getTaskID(), details.getJobID()));
-            JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
-                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
-                    jobExecutionContext.getGatewayID());
-            JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
-            jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
-        } catch (Exception e) {
-			throw new GFacException("Error persisting job status"
-					+ e.getLocalizedMessage(), e);
-		}
-	}
-
-	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
-			JobDetails details, JobState state) throws GFacException {
-		try {
-			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-			JobStatus status = new JobStatus();
-			status.setJobState(state);
-			status.setTimeOfStateChange(Calendar.getInstance()
-					.getTimeInMillis());
-			details.setJobStatus(status);
-			experimentCatalog.update(
-					ExperimentCatalogModelType.JOB_DETAIL,
-					details, details.getJobID());
-		} catch (Exception e) {
-			throw new GFacException("Error persisting job status"
-					+ e.getLocalizedMessage(), e);
-		}
-	}
-
-	public static void saveErrorDetails(
-			JobExecutionContext jobExecutionContext, String errorMessage,
-			CorrectiveAction action, ErrorCategory errorCatogory)
-			throws GFacException {
-		try {
-			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-			ErrorDetails details = new ErrorDetails();
-			details.setActualErrorMessage(errorMessage);
-			details.setCorrectiveAction(action);
-			details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
-			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-			details.setErrorCategory(errorCatogory);
-			experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
-					jobExecutionContext.getTaskData().getTaskID());
-		} catch (Exception e) {
-			throw new GFacException("Error persisting job status"
-					+ e.getLocalizedMessage(), e);
-		}
-	}
+//	public static void saveJobStatus(JobExecutionContext jobExecutionContext,
+//                                     JobDetails details, JobState state) throws GFacException {
+//		try {
+//            // first we save job details to the registry for sa and then save the job status.
+//            ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+//            JobStatus status = new JobStatus();
+//            status.setJobState(state);
+//            details.setJobStatus(status);
+//            experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
+//                    new CompositeIdentifier(jobExecutionContext.getTaskData()
+//                            .getTaskID(), details.getJobID()));
+//            JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+//                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+//                    jobExecutionContext.getGatewayID());
+//            JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+//            jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
+//        } catch (Exception e) {
+//			throw new GFacException("Error persisting job status"
+//					+ e.getLocalizedMessage(), e);
+//		}
+//	}
+
+//	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
+//			JobDetails details, JobState state) throws GFacException {
+//		try {
+//			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+//			JobStatus status = new JobStatus();
+//			status.setJobState(state);
+//			status.setTimeOfStateChange(Calendar.getInstance()
+//					.getTimeInMillis());
+//			details.setJobStatus(status);
+//			experimentCatalog.update(
+//					ExperimentCatalogModelType.JOB_DETAIL,
+//					details, details.getJobID());
+//		} catch (Exception e) {
+//			throw new GFacException("Error persisting job status"
+//					+ e.getLocalizedMessage(), e);
+//		}
+//	}
+
+//	public static void saveErrorDetails(
+//			JobExecutionContext jobExecutionContext, String errorMessage)
+//			throws GFacException {
+//		try {
+//			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+//			ErrorModel details = new ErrorModel();
+//			details.setActualErrorMessage(errorMessage);
+//			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+//			experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
+//					jobExecutionContext.getTaskData().getTaskID());
+//		} catch (Exception e) {
+//			throw new GFacException("Error persisting job status"
+//					+ e.getLocalizedMessage(), e);
+//		}
+//	}
 
     public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
         Map<String, Object> map = new HashMap<String, Object>();
@@ -330,155 +293,155 @@ public class GFacUtils {
         return map;
     }
 
-	public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
-			JobExecutionContext jobExecutionContext)
-			throws Exception {
-		String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
-				.getExperimentID());
-        if (expState == null || expState.isEmpty()) {
-            return GfacExperimentState.UNKNOWN;
-        }
-        return GfacExperimentState.findByValue(Integer.valueOf(expState));
-    }
-
-	public static boolean createHandlerZnode(CuratorFramework curatorClient,
-                                             JobExecutionContext jobExecutionContext, String className)
-			throws Exception {
-		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-				jobExecutionContext.getExperimentID(), className);
-		Stat exists = curatorClient.checkExists().forPath(expState);
-		if (exists == null) {
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-		} else {
-			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-			if (exists == null) {
-				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-			}
-		}
-
-		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-		if (exists != null) {
-			curatorClient.setData().withVersion(exists.getVersion())
-					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-							String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
-		}
-		return true;
-	}
-
-	public static boolean createHandlerZnode(CuratorFramework curatorClient,
-                                             JobExecutionContext jobExecutionContext, String className,
-                                             GfacHandlerState state) throws Exception {
-		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-				jobExecutionContext.getExperimentID(), className);
-		Stat exists = curatorClient.checkExists().forPath(expState);
-		if (exists == null) {
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(expState, new byte[0]);
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-		} else {
-			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-			if (exists == null) {
-				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-			}
-		}
-
-		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-		if (exists != null) {
-			curatorClient.setData().withVersion(exists.getVersion())
-					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-							String.valueOf(state.getValue()).getBytes());
-		}
-		return true;
-	}
-
-	public static boolean updateHandlerState(CuratorFramework curatorClient,
-                                             JobExecutionContext jobExecutionContext, String className,
-                                             GfacHandlerState state) throws Exception {
-		String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
-				jobExecutionContext.getExperimentID(), className);
-		Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-		if (exists != null) {
-			curatorClient.setData().withVersion(exists.getVersion())
-					.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
-		} else {
-			createHandlerZnode(curatorClient, jobExecutionContext, className, state);
-		}
-		return false;
-	}
-
-	public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
-                                                  JobExecutionContext jobExecutionContext, String className) {
-		try {
-			String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
-			Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-			if (exists != null) {
-				String stateVal = new String(curatorClient.getData().storingStatIn(exists)
-						.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
-				return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
-			}
-			return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
-							// return false
-		} catch (Exception e) {
-			log.error("Error occured while getting zk node status", e);
-			return null;
-		}
-	}
-
-	// This method is dangerous because of moving the experiment data
-	public static boolean createExperimentEntryForPassive(String experimentID,
-														  String taskID, CuratorFramework curatorClient, String experimentNode,
-														  String pickedChild, String tokenId, long deliveryTag) throws Exception {
-		String experimentPath = experimentNode + File.separator + pickedChild;
-		String newExperimentPath = experimentPath + File.separator + experimentID;
-		Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
-		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
-		if (oldExperimentPath == null) {  // this means this is a very new experiment
-			// are going to create a new node
-			log.info("This is a new Job, so creating all the experiment docs from the scratch");
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
-            String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(newExperimentPath + File.separator + "state",
-							String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
-
-			if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
-				log.info("Created the node: " + stateNodePath + " successfully !");
-			}else {
-				log.error("Error creating node: " + stateNodePath + " successfully !");
-			}
-			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-					.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
-		} else {
-			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
-            removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
-            if(newExperimentPath.equals(oldExperimentPath)){
-                log.info("Re-launch experiment came to the same GFac instance");
-            }else {
-				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
-				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
-						curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
-                copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
-				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
-				Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
-				if(exists!=null) {
-					curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-							.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
-									curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
-					ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
-				}
-				// After all the files are successfully transfered we delete the // old experiment,otherwise we do
-				// not delete a single file
-				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
-				log.info("Deleting experiment data: " + oldExperimentPath);
-				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
-			}
-		}
-		return true;
-	}
+//	public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
+//			JobExecutionContext jobExecutionContext)
+//			throws Exception {
+//		String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
+//				.getExperimentID());
+//        if (expState == null || expState.isEmpty()) {
+//            return GfacExperimentState.UNKNOWN;
+//        }
+//        return GfacExperimentState.findByValue(Integer.valueOf(expState));
+//    }
+//
+//	public static boolean createHandlerZnode(CuratorFramework curatorClient,
+//                                             JobExecutionContext jobExecutionContext, String className)
+//			throws Exception {
+//		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+//				jobExecutionContext.getExperimentID(), className);
+//		Stat exists = curatorClient.checkExists().forPath(expState);
+//		if (exists == null) {
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+//		} else {
+//			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//			if (exists == null) {
+//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+//			}
+//		}
+//
+//		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//		if (exists != null) {
+//			curatorClient.setData().withVersion(exists.getVersion())
+//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+//							String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
+//		}
+//		return true;
+//	}
+
+//	public static boolean createHandlerZnode(CuratorFramework curatorClient,
+//                                             JobExecutionContext jobExecutionContext, String className,
+//                                             GfacHandlerState state) throws Exception {
+//		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+//				jobExecutionContext.getExperimentID(), className);
+//		Stat exists = curatorClient.checkExists().forPath(expState);
+//		if (exists == null) {
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//					.forPath(expState, new byte[0]);
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+//		} else {
+//			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//			if (exists == null) {
+//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+//			}
+//		}
+//
+//		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//		if (exists != null) {
+//			curatorClient.setData().withVersion(exists.getVersion())
+//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+//							String.valueOf(state.getValue()).getBytes());
+//		}
+//		return true;
+//	}
+
+//	public static boolean updateHandlerState(CuratorFramework curatorClient,
+//                                             JobExecutionContext jobExecutionContext, String className,
+//                                             GfacHandlerState state) throws Exception {
+//		String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+//				jobExecutionContext.getExperimentID(), className);
+//		Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//		if (exists != null) {
+//			curatorClient.setData().withVersion(exists.getVersion())
+//					.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+//		} else {
+//			createHandlerZnode(curatorClient, jobExecutionContext, className, state);
+//		}
+//		return false;
+//	}
+
+//	public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
+//                                                  JobExecutionContext jobExecutionContext, String className) {
+//		try {
+//			String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+//			Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+//			if (exists != null) {
+//				String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+//						.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+//				return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+//			}
+//			return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
+//							// return false
+//		} catch (Exception e) {
+//			log.error("Error occured while getting zk node status", e);
+//			return null;
+//		}
+//	}
+
+//	// This method is dangerous because of moving the experiment data
+//	public static boolean createExperimentEntryForPassive(String experimentID,
+//														  String taskID, CuratorFramework curatorClient, String experimentNode,
+//														  String pickedChild, String tokenId, long deliveryTag) throws Exception {
+//		String experimentPath = experimentNode + File.separator + pickedChild;
+//		String newExperimentPath = experimentPath + File.separator + experimentID;
+//		Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+//		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+//		if (oldExperimentPath == null) {  // this means this is a very new experiment
+//			// are going to create a new node
+//			log.info("This is a new Job, so creating all the experiment docs from the scratch");
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+//            String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//					.forPath(newExperimentPath + File.separator + "state",
+//							String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+//
+//			if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+//				log.info("Created the node: " + stateNodePath + " successfully !");
+//			}else {
+//				log.error("Error creating node: " + stateNodePath + " successfully !");
+//			}
+//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//					.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
+//		} else {
+//			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+//            removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
+//            if(newExperimentPath.equals(oldExperimentPath)){
+//                log.info("Re-launch experiment came to the same GFac instance");
+//            }else {
+//				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
+//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+//						curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+//                copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+//				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+//				Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
+//				if(exists!=null) {
+//					curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+//							.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+//									curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+//					ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
+//				}
+//				// After all the files are successfully transfered we delete the // old experiment,otherwise we do
+//				// not delete a single file
+//				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+//				log.info("Deleting experiment data: " + oldExperimentPath);
+//				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
+//			}
+//		}
+//		return true;
+//	}
 
     private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
         Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
@@ -564,26 +527,26 @@ public class GFacUtils {
         return false;
     }
 
-    public static void saveHandlerData(JobExecutionContext jobExecutionContext,
-                                       StringBuffer data, String className) throws GFacHandlerException {
-		try {
-			CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
-			if (curatorClient != null) {
-				String expZnodeHandlerPath = AiravataZKUtils
-						.getExpZnodeHandlerPath(
-								jobExecutionContext.getExperimentID(),
-								className);
-				Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
-                if (exists != null) {
-					curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
-				} else {
-                    log.error("Saving Handler data failed, Stat is null");
-                }
-            }
-		} catch (Exception e) {
-			throw new GFacHandlerException(e);
-		}
-	}
+//    public static void saveHandlerData(JobExecutionContext jobExecutionContext,
+//                                       StringBuffer data, String className) throws GFacHandlerException {
+//		try {
+//			CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+//			if (curatorClient != null) {
+//				String expZnodeHandlerPath = AiravataZKUtils
+//						.getExpZnodeHandlerPath(
+//								jobExecutionContext.getExperimentID(),
+//								className);
+//				Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+//                if (exists != null) {
+//					curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+//				} else {
+//                    log.error("Saving Handler data failed, Stat is null");
+//                }
+//            }
+//		} catch (Exception e) {
+//			throw new GFacHandlerException(e);
+//		}
+//	}
 
 	public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
 		CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
@@ -685,31 +648,31 @@ public class GFacUtils {
 
     public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
         ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog();
-        Experiment details = (Experiment) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+        ExperimentModel details = (ExperimentModel) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
         if (details == null) {
-            details = new Experiment();
-            details.setExperimentID(experimentId);
+            details = new ExperimentModel();
+            details.setExperimentId(experimentId);
         }
-        org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
-        status.setExperimentState(state);
+        ExperimentStatus status = new ExperimentStatus();
+        status.setState(state);
         status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-        if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) &&
-                !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
-            status.setExperimentState(state);
+        if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getState()) &&
+                !ExperimentState.CANCELING.equals(details.getExperimentStatus().getState())) {
+            status.setState(state);
         } else {
-            status.setExperimentState(details.getExperimentStatus().getExperimentState());
+            status.setState(details.getExperimentStatus().getState());
         }
         details.setExperimentStatus(status);
-        log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
+        log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getState().toString());
         airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, experimentId);
-        return details.getExperimentStatus().getExperimentState();
+        return details.getExperimentStatus().getState();
     }
 
     public static boolean isFailedJob (JobExecutionContext jec) {
-        JobStatus jobStatus = jec.getJobDetails().getJobStatus();
-        if (jobStatus.getJobState() == JobState.FAILED) {
-            return true;
-        }
+//        JobStatus jobStatus = jec.getJobDetails().getJobStatus();
+//        if (jobStatus.getJobState() == JobState.FAILED) {
+//            return true;
+//        }
         return false;
     }
 
@@ -731,15 +694,297 @@ public class GFacUtils {
         return false;
     }
 
-    public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskState state){
-        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                jobExecutionContext.getExperimentID(),
-                jobExecutionContext.getGatewayID());
-        publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
-    }
+//    public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskStatus state){
+//        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+//                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+//                jobExecutionContext.getExperimentID(),
+//                jobExecutionContext.getGatewayID());
+//        publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+//    }
 
 	public static String getZKGfacServersParentPath() {
 		return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
 	}
+
+    public static JobDescriptor createJobDescriptor (ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        String emailIds = null;
+        ProcessModel processModel = processContext.getProcessModel();
+        ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
+        if (isEmailBasedJobMonitor(processContext)){
+            emailIds = ServerSettings.getEmailBasedMonitorAddress();
+        }
+        if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+            String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+            if (flags != null && processContext.getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+                flags = "ALL";
+            }
+            jobDescriptor.setMailOptions(flags);
+
+            String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+            if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+                if (emailIds != null && !emailIds.isEmpty()) {
+                    emailIds += ("," + userJobNotifEmailIds);
+                } else {
+                    emailIds = userJobNotifEmailIds;
+                }
+            }
+            if (processModel.isEnableEmailNotification()) {
+                List<String> emailList = processModel.getEmailAddresses();
+                String elist = GFacUtils.listToCsv(emailList, ',');
+                if (elist != null && !elist.isEmpty()) {
+                    if (emailIds != null && !emailIds.isEmpty()) {
+                        emailIds = emailIds + "," + elist;
+                    } else {
+                        emailIds = elist;
+                    }
+                }
+            }
+        }
+        if (emailIds != null && !emailIds.isEmpty()) {
+            log.info("Email list: " + emailIds);
+            jobDescriptor.setMailAddress(emailIds);
+        }
+
+        jobDescriptor.setInputDirectory(processContext.getInputDir());
+        jobDescriptor.setOutputDirectory(processContext.getOutputDir());
+        jobDescriptor.setExecutablePath(processContext.getApplicationDeploymentDescription().getExecutablePath());
+        jobDescriptor.setStandardOutFile(processContext.getStdoutLocation());
+        jobDescriptor.setStandardErrorFile(processContext.getStderrLocation());
+        String computationalProjectAccount = getComputeResourcePreference(processContext).getAllocationProjectNumber();
+        if (computationalProjectAccount != null) {
+            jobDescriptor.setAcountString(computationalProjectAccount);
+        }
+        // To make job name alpha numeric
+        jobDescriptor.setJobName("A" + String.valueOf(generateJobName()));
+        jobDescriptor.setWorkingDirectory(processContext.getWorkingDir());
+
+        List<String> inputValues = new ArrayList<String>();
+        List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+
+        // sort the inputs first and then build the command ListR
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (InputDataObjectType input : processInputs) {
+                sortedInputSet.add(input);
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+                continue;
+            }
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                inputValues.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                if (inputDataObjectType.getType() == DataType.URI) {
+                    // set only the relative path
+                    String filePath = inputDataObjectType.getValue();
+                    filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                    inputValues.add(filePath);
+                }else {
+                    inputValues.add(inputDataObjectType.getValue());
+                }
+
+            }
+        }
+
+        List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+        for (OutputDataObjectType output : processOutputs) {
+            if (output.getApplicationArgument() != null
+                    && !output.getApplicationArgument().equals("")) {
+                inputValues.add(output.getApplicationArgument());
+            }
+            if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+                if (output.getType() == DataType.URI) {
+                    String filePath = output.getValue();
+                    filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                    inputValues.add(filePath);
+                }
+            }
+        }
+
+        jobDescriptor.setInputValues(inputValues);
+        jobDescriptor.setUserName(processContext.getRemoteCluster().getServerInfo().getUserName());
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setOwner(processContext.getRemoteCluster().getServerInfo().getUserName());
+
+        ComputationalResourceSchedulingModel scheduling = processModel.getResourceSchedule();
+        if (scheduling != null) {
+            int totalNodeCount = scheduling.getNodeCount();
+            int totalCPUCount = scheduling.getTotalCPUCount();
+
+            if (scheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(scheduling.getQueueName());
+            }
+
+            if (totalNodeCount > 0) {
+                jobDescriptor.setNodes(totalNodeCount);
+            }
+
+            if (scheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(scheduling.getQueueName());
+            }
+            if (totalCPUCount > 0) {
+                int ppn = totalCPUCount / totalNodeCount;
+                jobDescriptor.setProcessesPerNode(ppn);
+                jobDescriptor.setCPUCount(totalCPUCount);
+            }
+            if (scheduling.getWallTimeLimit() > 0) {
+                jobDescriptor.setMaxWallTime(String.valueOf(scheduling.getWallTimeLimit()));
+                if (resourceJobManager != null){
+                    if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+                        jobDescriptor.setMaxWallTimeForLSF(String.valueOf(scheduling.getWallTimeLimit()));
+                    }
+                }
+            }
+            if (scheduling.getTotalPhysicalMemory() > 0) {
+                jobDescriptor.setUsedMemory(scheduling.getTotalPhysicalMemory() + "");
+            }
+        } else {
+            log.error("Task scheduling cannot be null at this point..");
+        }
+        ApplicationDeploymentDescription appDepDescription = processContext.getApplicationDeploymentDescription();
+        List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+        if (moduleCmds != null) {
+            for (String moduleCmd : moduleCmds) {
+                jobDescriptor.addModuleLoadCommands(moduleCmd);
+            }
+        }
+        List<String> preJobCommands = appDepDescription.getPreJobCommands();
+        if (preJobCommands != null) {
+            for (String preJobCommand : preJobCommands) {
+                jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, processContext));
+            }
+        }
+
+        List<String> postJobCommands = appDepDescription.getPostJobCommands();
+        if (postJobCommands != null) {
+            for (String postJobCommand : postJobCommands) {
+                jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, processContext));
+            }
+        }
+
+        ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+        if (parallelism != null){
+            if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+                if (resourceJobManager != null){
+                    Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+                    if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+                        for (JobManagerCommand command : jobManagerCommands.keySet()) {
+                            if (command == JobManagerCommand.SUBMISSION) {
+                                String commandVal = jobManagerCommands.get(command);
+                                jobDescriptor.setJobSubmitter(commandVal);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return jobDescriptor;
+    }
+
+    private static int generateJobName() {
+        Random random = new Random();
+        int i = random.nextInt(Integer.MAX_VALUE);
+        i = i + 99999999;
+        if(i<0) {
+            i = i * (-1);
+        }
+        return i;
+    }
+
+    private static String parseCommand(String value, ProcessContext context) {
+        String parsedValue = value.replaceAll("\\$workingDir", context.getWorkingDir());
+        parsedValue = parsedValue.replaceAll("\\$inputDir", context.getInputDir());
+        parsedValue = parsedValue.replaceAll("\\$outputDir", context.getOutputDir());
+        return parsedValue;
+    }
+
+    public static ResourceJobManager getResourceJobManager(ProcessContext processContext) {
+        try {
+            JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+            JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+            if (submissionProtocol == JobSubmissionProtocol.SSH) {
+                SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getResourceJobManager();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+                LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (localJobSubmission != null) {
+                    return localJobSubmission.getResourceJobManager();
+                }
+            }
+        } catch (AppCatalogException e) {
+            log.error("Error occured while retrieving resource job manager", e);
+        }
+        return null;
+    }
+
+    public static boolean isEmailBasedJobMonitor(ProcessContext processContext) throws GFacException, AppCatalogException {
+        JobSubmissionProtocol jobSubmissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+        JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+            String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
+            SSHJobSubmission sshJobSubmission = processContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+        } else {
+            return false;
+        }
+    }
+
+    public static JobSubmissionInterface getPreferredJobSubmissionInterface (ProcessContext context) throws AppCatalogException {
+        try {
+            String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+            ComputeResourceDescription resourceDescription = context.getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+            if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+                Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+                    @Override
+                    public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                        return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                    }
+                });
+            } else {
+                throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+            }
+            return jobSubmissionInterfaces.get(0);
+        }catch (AppCatalogException e){
+            throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+        }
+    }
+
+    public static JobSubmissionProtocol getPreferredJobSubmissionProtocol (ProcessContext context) throws AppCatalogException{
+        try {
+            GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
+            String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+            ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(context.getGatewayId(), resourceHostId);
+            return preference.getPreferredJobSubmissionProtocol();
+        } catch (AppCatalogException e) {
+            log.error("Error occurred while initializing app catalog", e);
+            throw new AppCatalogException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static  ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
+            String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+            return gatewayProfile.getComputeResourcePreference(context.getGatewayId(), resourceHostId);
+        } catch (AppCatalogException e) {
+            log.error("Error occurred while initializing app catalog", e);
+            throw new AppCatalogException("Error occurred while initializing app catalog", e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 16943fe..013e433 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -24,6 +24,9 @@ package org.apache.airavata.gfac.core.context;
 import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -44,10 +47,17 @@ public class ProcessContext {
 	private final String tokenId;
 	private ProcessModel processModel;
 	private String workingDir;
+	private String inputDir;
+    private String outputDir;
 	private List<Task> taskChain;
 	private GatewayResourceProfile gatewayResourceProfile;
+    private ComputeResourceDescription computeResourceDescription;
+    private ApplicationDeploymentDescription applicationDeploymentDescription;
+    private ApplicationInterfaceDescription applicationInterfaceDescription;
 	private RemoteCluster remoteCluster;
 	private Map<String, String> sshProperties;
+    private String stdoutLocation;
+    private String stderrLocation;
 
 	public ProcessContext(String processId, String gatewayId, String tokenId) {
 		this.processId = processId;
@@ -148,4 +158,60 @@ public class ProcessContext {
 	public void setSshProperties(Map<String, String> sshProperties) {
 		this.sshProperties = sshProperties;
 	}
+
+    public ComputeResourceDescription getComputeResourceDescription() {
+        return computeResourceDescription;
+    }
+
+    public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+        this.computeResourceDescription = computeResourceDescription;
+    }
+
+    public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+        return applicationDeploymentDescription;
+    }
+
+    public void setApplicationDeploymentDescription(ApplicationDeploymentDescription applicationDeploymentDescription) {
+        this.applicationDeploymentDescription = applicationDeploymentDescription;
+    }
+
+    public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+        return applicationInterfaceDescription;
+    }
+
+    public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+        this.applicationInterfaceDescription = applicationInterfaceDescription;
+    }
+
+    public String getStdoutLocation() {
+        return stdoutLocation;
+    }
+
+    public void setStdoutLocation(String stdoutLocation) {
+        this.stdoutLocation = stdoutLocation;
+    }
+
+    public String getStderrLocation() {
+        return stderrLocation;
+    }
+
+    public void setStderrLocation(String stderrLocation) {
+        this.stderrLocation = stderrLocation;
+    }
+
+    public void setOutputDir(String outputDir) {
+        this.outputDir = outputDir;
+    }
+
+    public String getOutputDir() {
+        return outputDir;
+    }
+
+    public String getInputDir() {
+        return inputDir;
+    }
+
+    public void setInputDir(String inputDir) {
+        this.inputDir = inputDir;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
new file mode 100644
index 0000000..2696236
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.JobDescriptor;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class JobSubmissionTaskImpl implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(JobSubmissionTaskImpl.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskState execute(TaskContext taskContext) throws TaskException {
+        try {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            AppCatalog appCatalog = processContext.getAppCatalog();
+            String resourceHostId = processContext.getProcessModel().getResourceSchedule().getResourceHostId();
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
+            LocalEventPublisher publisher = processContext.getLocalEventPublisher();
+            RemoteCluster remoteCluster = processContext.getRemoteCluster();
+            JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+        } catch (AppCatalogException e) {
+            log.error("Error while instatiating app catalog",e);
+            throw new TaskException("Error while instatiating app catalog", e);
+        } catch (ApplicationSettingsException e) {
+            log.error("Error occurred while creating job descriptor", e);
+            throw new TaskException("Error occurred while creating job descriptor", e);
+        } catch (GFacException e) {
+            log.error("Error occurred while creating job descriptor", e);
+            throw new TaskException("Error occurred while creating job descriptor", e);
+        }
+
+
+        return null;
+    }
+
+    @Override
+    public TaskState recover(TaskContext taskContext) throws TaskException {
+        return null;
+    }
+}