You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 16:06:43 UTC

[1/2] airavata git commit: Add utility classes for worker

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 993bc1d07 -> aeaf35b46


http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
new file mode 100644
index 0000000..2a104dc
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
@@ -0,0 +1,194 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.status.*;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.airavata.worker.commons.context.ProcessContext;
+import org.apache.airavata.worker.commons.context.TaskContext;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public class WorkerUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(WorkerUtils.class);
+
+    public static void saveExperimentError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String experimentId = processContext.getExperimentId();
+            errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+            experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment errors";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static void saveProcessError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+            experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, processContext.getProcessId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating process errors";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static void saveTaskError(TaskContext taskContext, ErrorModel errorModel) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = taskContext.getParentProcessContext().getExperimentCatalog();
+            String taskId = taskContext.getTaskId();
+            errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+            experimentCatalog.add(ExpCatChildDataType.TASK_ERROR, errorModel, taskId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + taskContext.getParentProcessContext().getExperimentId() + " processId: " + taskContext.getParentProcessContext().getProcessId() + " taskId: " + taskContext.getTaskId()
+                    + " : - Error while updating task errors";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static void handleProcessInterrupt(ProcessContext processContext) throws WorkerException {
+        if (processContext.isCancel()) {
+            ProcessStatus pStatus = new ProcessStatus(ProcessState.CANCELLING);
+            pStatus.setReason("Process Cancel triggered");
+            pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            processContext.setProcessStatus(pStatus);
+            saveAndPublishProcessStatus(processContext);
+            // do cancel operation here
+
+            pStatus.setState(ProcessState.CANCELED);
+            processContext.setProcessStatus(pStatus);
+            pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            saveAndPublishProcessStatus(processContext);
+        }else if (processContext.isHandOver()) {
+
+        } else {
+            logger.error("expId: {}, processId: {} :- Unknown process interrupt", processContext.getExperimentId(),
+                    processContext.getProcessId());
+        }
+    }
+
+    public static JobModel getJobModel(ProcessContext processContext) throws RegistryException {
+        ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+        List<Object> objects = experimentCatalog.get(ExperimentCatalogModelType.JOB,
+                Constants.FieldConstants.JobConstants.PROCESS_ID, processContext.getProcessId());
+        List<JobModel> jobModels = new ArrayList<>();
+        JobModel jobModel = null;
+        if (objects != null) {
+            for (Object object : objects) {
+                jobModel = ((JobModel) object);
+                if (jobModel.getJobId() != null || !jobModel.equals("")) {
+                    return jobModel;
+                }
+            }
+        }
+        return jobModel;
+    }
+
+    public static List<String> parseTaskDag(String taskDag) {
+        // TODO - parse taskDag and create taskId list
+        String[] tasks = taskDag.split(",");
+        return Arrays.asList(tasks);
+    }
+
+    public static void saveAndPublishTaskStatus(TaskContext taskContext) throws WorkerException {
+        try {
+            TaskState state = taskContext.getTaskState();
+            // first we save job jobModel to the registry for sa and then save the job status.
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            TaskStatus status = taskContext.getTaskStatus();
+            if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            }else {
+                status.setTimeOfStateChange(status.getTimeOfStateChange());
+            }
+            experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId());
+            TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(),
+                    processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+                    processContext.getGatewayId());
+            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
+                    identifier);
+            MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+                    (MessageType.TASK.name()), taskContext.getParentProcessContext().getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            processContext.getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            throw new WorkerException("Error persisting task status"
+                    + e.getLocalizedMessage(), e);
+        }
+    }
+
+    public static void saveAndPublishProcessStatus(ProcessContext processContext) throws WorkerException {
+        try {
+            // first we save job jobModel to the registry for sa and then save the job status.
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            ProcessStatus status = processContext.getProcessStatus();
+            if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            }else {
+                status.setTimeOfStateChange(status.getTimeOfStateChange());
+            }
+            experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId());
+            ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(),
+                    processContext.getProcessModel().getExperimentId(),
+                    processContext.getGatewayId());
+            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+            MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            processContext.getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            throw new WorkerException("Error persisting process status"
+                    + e.getLocalizedMessage(), e);
+        }
+    }
+
+    public static void saveJobStatus(ProcessContext processContext, JobModel jobModel) throws WorkerException {
+        try {
+            // first we save job jobModel to the registry for sa and then save the job status.
+            JobStatus jobStatus = null;
+            if(jobModel.getJobStatuses() != null)
+                jobStatus = jobModel.getJobStatuses().get(0);
+
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            List<JobStatus> statuses = new ArrayList<>();
+            statuses.add(jobStatus);
+            jobModel.setJobStatuses(statuses);
+            if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ){
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            }else {
+                jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+            }
+            CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
+            experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+                    processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+                    processContext.getGatewayId());
+            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+            MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+                    (MessageType.JOB.name()), processContext.getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            processContext.getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            throw new WorkerException("Error persisting job status"
+                    + e.getLocalizedMessage(), e);
+        }
+    }
+}


[2/2] airavata git commit: Add utility classes for worker

Posted by go...@apache.org.
Add utility classes for worker


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/aeaf35b4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/aeaf35b4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/aeaf35b4

Branch: refs/heads/feature-workload-mgmt
Commit: aeaf35b4615374c00f7477de9ae46a36f69ed88b
Parents: 993bc1d
Author: Gourav Shenoy <go...@apache.org>
Authored: Thu Apr 13 12:06:40 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Thu Apr 13 12:06:40 2017 -0400

----------------------------------------------------------------------
 .../impl/AuroraJobSubmissionTask.java           | 160 ++++++
 .../impl/BESJobSubmissionTask.java              | 501 +++++++++++++++++++
 .../impl/DefaultJobSubmissionTask.java          | 377 ++++++++++++++
 .../impl/ForkJobSubmissionTask.java             | 176 +++++++
 .../impl/LocalJobSubmissionTask.java            | 202 ++++++++
 .../jobsubmission/utils/JobSubmissionUtils.java |   7 +
 .../worker/task/jobsubmission/utils/sample      |   0
 .../commons/cluster/AbstractRemoteCluster.java  |   4 +-
 .../commons/utils/JobManagerConfiguration.java  |  34 ++
 .../worker/commons/utils/WorkerUtils.java       | 194 +++++++
 10 files changed, 1653 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
new file mode 100644
index 0000000..2e55fe7
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.IdentityBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.bean.ProcessBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResourceBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean;
+import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.GroovyMap;
+import org.apache.airavata.gfac.core.Script;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuroraJobSubmissionTask implements JobSubmissionTask{
+
+    private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmissionTask.class);
+
+    @Override
+    public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobState(JobState.ACTIVE);
+        return jobStatus;
+    }
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        JobModel jobModel = processContext.getJobModel();
+        jobModel.setTaskId(taskContext.getTaskId());
+        String jobIdAndName = "A" + GFacUtils.generateJobName();
+        jobModel.setJobName(jobIdAndName);
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobState(JobState.SUBMITTED);
+
+        try {
+            JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName);
+            IdentityBean owner = new IdentityBean(AuroraUtils.ROLE);
+            GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+            groovyMap.add(Script.JOB_SUBMITTER_COMMAND, "sh");
+            String templateFileName = GFacUtils.getTemplateFileName(ResourceJobManagerType.CLOUD);
+            String script = GFacUtils.generateScript(groovyMap, templateFileName);
+            Set<ProcessBean> processes = new LinkedHashSet<>();
+            ProcessBean process_1 = new ProcessBean("main_process", script, false);
+            processes.add(process_1);
+
+            groovyMap.getStringValue(Script.STANDARD_OUT_FILE)
+                    .ifPresent(stdout -> {
+                        ProcessBean stdOutProcess = new ProcessBean("stdout_copy_process", "cp .logs/main_process/0/stdout " + stdout, false);
+                        processes.add(stdOutProcess);
+                    });
+
+            groovyMap.getStringValue(Script.STANDARD_ERROR_FILE)
+                    .ifPresent(stderr -> {
+                        ProcessBean stdErrProcess = new ProcessBean("stderr_copy_process", "cp .logs/main_process/0/stderr " + stderr, false);
+                        processes.add(stdErrProcess);
+                    });
+
+            ResourceBean resources = new ResourceBean(1.5, 512, 512);
+
+            TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources);
+            JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER);
+
+            String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig);
+            log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson);
+
+            AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient();
+            ResponseBean response = client.createJob(jobConfig);
+            log.info("Response for job {}, {}", jobIdAndName, response);
+            jobModel.setJobDescription(resources.toString());
+
+            jobModel.setJobId(jobIdAndName);
+            jobStatus.setReason("Successfully Submitted");
+            jobModel.setJobStatuses(Arrays.asList(jobStatus ));
+            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.getParentProcessContext().setJobModel(jobModel);
+
+            GFacUtils.saveJobModel(processContext, jobModel);
+            GFacUtils.saveJobStatus(processContext, jobModel);
+            taskStatus.setReason("Successfully submitted job to Aurora");
+        } catch (Throwable e) {
+            String msg = "Error occurred while submitting Aurora job";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+
+        taskContext.setTaskStatus(taskStatus);
+        try {
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+        } catch (GFacException e) {
+            log.error("Error while saving task status", e);
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return execute(taskContext);
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.JOB_SUBMISSION;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
new file mode 100644
index 0000000..2035d75
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
@@ -0,0 +1,501 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import de.fzj.unicore.bes.client.ActivityClient;
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.gfac.impl.task.utils.bes.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+public class BESJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(BESJobSubmissionTask.class);
+    private DefaultClientConfiguration secProperties;
+
+    private String jobId;
+    private String hostName;
+    private String userName;
+    private String inputPath;
+    private int DEFAULT_SSH_PORT = 22;
+    private AuthenticationInfo authenticationInfo;
+
+    @Override
+    public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+        return null;
+    }
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        StorageClient sc = null;
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        // FIXME - use original output dir
+        setInputOutputLocations(processContext);
+        try {
+            // con't reuse if UserDN has been changed.
+            secProperties = getSecurityConfig(processContext);
+            // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
+        } catch (GFacException e) {
+            String msg = "Unicorn security context initialization error";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            return taskStatus;
+        }
+
+        try {
+            JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
+            JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
+            String factoryUrl = null;
+            if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(
+                        jobSubmissionInterface.getJobSubmissionInterfaceId());
+                factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+            }
+            EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+            eprt.addNewAddress().setStringValue(factoryUrl);
+            String userDN = processContext.getProcessModel().getUserDn();
+
+            CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+
+            // create storage
+            StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null);
+            sc = storageCreator.createStorage();
+
+            JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(processContext, sc.getUrl()).getJobDefinition();
+            cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+            log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
+
+            // copy files to local
+            copyInputFilesToLocal(taskContext);
+            // upload files if any
+            DataTransferrer dt = new DataTransferrer(processContext, sc);
+            dt.uploadLocalFiles();
+
+            JobModel jobDetails = new JobModel();
+            jobDetails.setTaskId(taskContext.getTaskId());
+            jobDetails.setProcessId(taskContext.getProcessId());
+            FactoryClient factory = new FactoryClient(eprt, secProperties);
+
+            log.info("Activity Submitting to {} ... \n", factoryUrl);
+            CreateActivityResponseDocument response = factory.createActivity(cad);
+            log.info("Activity Submitted to {} ... \n", factoryUrl);
+
+            EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+            log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+            // factory.waitWhileActivityIsDone(activityEpr, 1000);
+            jobId = WSUtilities.extractResourceID(activityEpr);
+            if (jobId == null) {
+                jobId = new Long(Calendar.getInstance().getTimeInMillis())
+                        .toString();
+            }
+            log.info("JobID: " + jobId);
+            jobDetails.setJobId(jobId);
+            jobDetails.setJobDescription(activityEpr.toString());
+            jobDetails.setJobStatuses(Arrays.asList(new JobStatus(JobState.SUBMITTED)));
+            processContext.setJobModel(jobDetails);
+            GFacUtils.saveJobModel(processContext, jobDetails);
+            GFacUtils.saveJobStatus(processContext, jobDetails);
+            log.info(formatStatusMessage(activityEpr.getAddress()
+                    .getStringValue(), factory.getActivityStatus(activityEpr)
+                    .toString()));
+
+            waitUntilDone(eprt, activityEpr, processContext, secProperties);
+
+            ActivityStatusType activityStatus = null;
+            activityStatus = getStatus(factory, activityEpr);
+            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+            ActivityClient activityClient;
+            activityClient = new ActivityClient(activityEpr, secProperties);
+            // now use the activity working directory property
+            dt.setStorageClient(activityClient.getUspaceClient());
+
+            List<OutputDataObjectType> copyOutput = null;
+
+            if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+                String error = activityStatus.getFault().getFaultcode()
+                        .getLocalPart()
+                        + "\n"
+                        + activityStatus.getFault().getFaultstring()
+                        + "\n EXITCODE: " + activityStatus.getExitCode();
+                log.error(error);
+
+                JobState applicationJobStatus = JobState.FAILED;
+                jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+                sendNotification(processContext, jobDetails);
+                try {Thread.sleep(5000);} catch (InterruptedException e) {}
+
+                //What if job is failed before execution and there are not stdouts generated yet?
+                log.debug("Downloading any standard output and error files, if they were produced.");
+                copyOutput = dt.downloadRemoteFiles();
+
+            } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+                JobState applicationJobStatus = JobState.CANCELED;
+                jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+                GFacUtils.saveJobStatus(processContext, jobDetails);
+                throw new GFacException(
+                        processContext.getExperimentId() + "Job Canceled");
+            } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException ignored) {
+                }
+                JobState applicationJobStatus = JobState.COMPLETE;
+                jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+                GFacUtils.saveJobStatus(processContext, jobDetails);
+                log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(),
+                        activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString());
+
+//                if (activityStatus.getExitCode() == 0) {
+//                } else {
+//                    dt.downloadStdOuts();
+//                }
+                copyOutput = dt.downloadRemoteFiles();
+            }
+            if (copyOutput != null) {
+                copyOutputFilesToStorage(taskContext, copyOutput);
+                for (OutputDataObjectType outputDataObjectType : copyOutput) {
+                    GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+                }
+            }
+//            dt.publishFinalOutputs();
+            taskStatus.setState(TaskState.COMPLETED);
+        } catch (AppCatalogException e) {
+            log.error("Error while retrieving UNICORE job submission.." , e);
+            taskStatus.setState(TaskState.FAILED);
+        } catch (Exception e) {
+            log.error("BES task failed... ", e);
+            taskStatus.setState(TaskState.FAILED);
+        }
+
+        return taskStatus;
+    }
+
+    private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+        ProcessContext pc = taskContext.getParentProcessContext();
+        String remoteFilePath = null, fileName = null, localFilePath = null;
+        try {
+            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            ServerInfo serverInfo = pc.getComputeResourceServerInfo();
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            for (OutputDataObjectType output : copyOutput) {
+                switch (output.getType()) {
+                    case STDERR: case STDOUT: case STRING: case URI:
+                        localFilePath = output.getValue();
+                        if (localFilePath.contains("://")) {
+                            localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
+                        }
+                        fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
+                        URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                        remoteFilePath = destinationURI.getPath();
+                        log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath);
+                        SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
+                        output.setValue(destinationURI.toString());
+                        break;
+                    default:
+                        break;
+                }
+            }
+        } catch (IOException | JSchException | SSHApiException | URISyntaxException | CredentialStoreException e) {
+            log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
+            throw new GFacException("Error while scp output files to remote storage file location", e);
+        }
+    }
+
+    private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+        ProcessContext pc = taskContext.getParentProcessContext();
+        StorageResourceDescription storageResource = pc.getStorageResource();
+
+        if (storageResource != null) {
+            hostName = storageResource.getHostName();
+        } else {
+            throw new GFacException("Storage Resource is null");
+        }
+        inputPath = pc.getStorageFileSystemRootLocation();
+        inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+        String remoteFilePath = null, fileName = null, localFilePath = null;
+        URI remoteFileURI = null;
+        try {
+            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            ServerInfo serverInfo = pc.getStorageResourceServerInfo();
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+            List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
+            for (InputDataObjectType input : processInputs) {
+                if (input.getType() == DataType.URI) {
+                    remoteFileURI = new URI(input.getValue());
+                    remoteFilePath = remoteFileURI.getPath();
+                    fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
+                    localFilePath = pc.getInputDir() + File.separator + fileName;
+                    log.info("SCP remote file :{} -> to local :{}", remoteFilePath, localFilePath);
+                    SSHUtils.scpFrom(remoteFilePath, localFilePath, sshSession);
+                    input.setValue("file:/" + localFilePath);
+                }
+            }
+        } catch (IOException | JSchException | SSHApiException | URISyntaxException e) {
+            log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
+            throw new GFacException("Error while scp input files to local file location", e);
+        } catch (CredentialStoreException e) {
+            String msg = "Authentication issue, make sure you are passing valid credential token";
+            log.error(msg, e);
+            throw new GFacException(msg, e);
+        }
+    }
+
+    private void setInputOutputLocations(ProcessContext processContext) {
+        String localPath = System.getProperty("java.io.tmpdir") + File.separator + processContext.getProcessId();
+        new File(localPath).mkdir();
+
+        processContext.setInputDir(localPath);
+        processContext.setOutputDir(localPath);
+    }
+
+    private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
+        DefaultClientConfiguration clientConfig = null;
+        try {
+            UNICORESecurityContext unicoreSecurityContext = SecurityUtils.getSecurityContext(pc);
+            UserConfigurationDataModel userConfigDataModel = (UserConfigurationDataModel) pc.getExperimentCatalog().
+                    get(ExperimentCatalogModelType.USER_CONFIGURATION_DATA, pc.getExperimentId());
+            // FIXME - remove following setter lines, and use original value comes with user configuration data model.
+            userConfigDataModel.setGenerateCert(true);
+//            userConfigDataModel.setUserDN("CN=swus3, O=Ultrascan Gateway, C=DE");
+            if (userConfigDataModel.isGenerateCert()) {
+                clientConfig = unicoreSecurityContext.getDefaultConfiguration(false, userConfigDataModel);
+            } else {
+                clientConfig = unicoreSecurityContext.getDefaultConfiguration(false);
+            }
+        } catch (RegistryException e) {
+            throw new GFacException("Error! reading user configuration data from registry", e);
+        } catch (ApplicationSettingsException e) {
+            throw new GFacException("Error! retrieving default client configurations", e);
+        }
+
+        return clientConfig;
+    }
+
+    protected String formatStatusMessage(String activityUrl, String status) {
+        return String.format("Activity %s is %s.\n", activityUrl, status);
+    }
+
+    protected void waitUntilDone(EndpointReferenceType factoryEpr, EndpointReferenceType activityEpr, ProcessContext processContext, DefaultClientConfiguration secProperties) throws Exception {
+
+        try {
+            FactoryClient factoryClient = new FactoryClient(factoryEpr, secProperties);
+            JobState applicationJobStatus = null;
+
+            ActivityStateEnumeration.Enum activityStatus = factoryClient.getActivityStatus(activityEpr);
+            while ((activityStatus != ActivityStateEnumeration.FINISHED)
+                    && (activityStatus != ActivityStateEnumeration.FAILED)
+                    && (activityStatus != ActivityStateEnumeration.CANCELLED)
+                    && (applicationJobStatus != JobState.COMPLETE)) {
+
+                ActivityStatusType activityStatusType = getStatus(factoryClient, activityEpr);
+                applicationJobStatus = getApplicationJobStatus(activityStatusType);
+                sendNotification(processContext,processContext.getJobModel());
+                // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+                // applicationJobStatus);
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {}
+
+                activityStatus = factoryClient.getActivityStatus(activityEpr);
+            }
+        } catch(Exception e) {
+            log.error("Error monitoring job status..");
+            throw e;
+        }
+    }
+
+    private void sendNotification(ProcessContext processContext,  JobModel jobModel) throws GFacException {
+        GFacUtils.saveJobStatus(processContext, jobModel);
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return execute(taskContext);
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.JOB_SUBMISSION;
+    }
+
+    protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+            throws UnknownActivityIdentifierFault {
+
+        GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory
+                .newInstance();
+
+        stats.addNewGetActivityStatuses().setActivityIdentifierArray(
+                new EndpointReferenceType[] { activityEpr });
+
+        GetActivityStatusesResponseDocument resDoc = fc
+                .getActivityStatuses(stats);
+
+        ActivityStatusType activityStatus = resDoc
+                .getGetActivityStatusesResponse().getResponseArray()[0]
+                .getActivityStatus();
+        return activityStatus;
+    }
+
+    private JobState getApplicationJobStatus(ActivityStatusType activityStatus) {
+        if (activityStatus == null) {
+            return JobState.UNKNOWN;
+        }
+        ActivityStateEnumeration.Enum state = activityStatus.getState();
+        String status = null;
+        XmlCursor acursor = activityStatus.newCursor();
+        try {
+            if (acursor.toFirstChild()) {
+                if (acursor.getName().getNamespaceURI()
+                        .equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+                    status = acursor.getName().getLocalPart();
+                }
+            }
+            if (status != null) {
+                if (status.equalsIgnoreCase("Queued")
+                        || status.equalsIgnoreCase("Starting")
+                        || status.equalsIgnoreCase("Ready")) {
+                    return JobState.QUEUED;
+                } else if (status.equalsIgnoreCase("Staging-In")) {
+                    return JobState.SUBMITTED;
+                } else if (status.equalsIgnoreCase("FINISHED")) {
+                    return JobState.COMPLETE;
+                }else if(status.equalsIgnoreCase("Staging-Out")){
+                    return JobState.ACTIVE;
+                }
+                else if (status.equalsIgnoreCase("Executing")) {
+                    return JobState.ACTIVE;
+                } else if (status.equalsIgnoreCase("FAILED")) {
+                    return JobState.FAILED;
+                } else if (status.equalsIgnoreCase("CANCELLED")) {
+                    return JobState.CANCELED;
+                }
+            } else {
+                if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+                    return JobState.CANCELED;
+                } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+                    return JobState.FAILED;
+                } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+                    return JobState.COMPLETE;
+                } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+                    return JobState.ACTIVE;
+                }
+            }
+        } finally {
+            if (acursor != null)
+                acursor.dispose();
+        }
+        return JobState.UNKNOWN;
+    }
+
+    /**
+     * EndpointReference need to be saved to make cancel work.
+     *
+     * @param processContext
+     * @throws GFacException
+     */
+    public boolean cancelJob(ProcessContext processContext) throws GFacException {
+        try {
+            String activityEpr = processContext.getJobModel().getJobDescription();
+            // initSecurityProperties(processContext);
+            EndpointReferenceType eprt = EndpointReferenceType.Factory
+                    .parse(activityEpr);
+            JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
+            String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
+            String factoryUrl = null;
+            if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+                factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+            }
+            EndpointReferenceType epr = EndpointReferenceType.Factory
+                    .newInstance();
+            epr.addNewAddress().setStringValue(factoryUrl);
+
+            FactoryClient factory = new FactoryClient(epr, secProperties);
+            factory.terminateActivity(eprt);
+            return true;
+        } catch (Exception e) {
+            throw new GFacException(e.getLocalizedMessage(), e);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..beeb66e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
@@ -0,0 +1,377 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.*;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class DefaultJobSubmissionTask implements JobSubmissionTask {
+	private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+	public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
+	private static int waitForProcessIdmillis = 5000;
+	private static int pauseTimeInSec = waitForProcessIdmillis / 1000;
+
+	@Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext){
+	    TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+	    try {
+		    ProcessContext processContext = taskContext.getParentProcessContext();
+		    JobModel jobModel = processContext.getJobModel();
+		    jobModel.setTaskId(taskContext.getTaskId());
+		    RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+			GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+			groovyMap.getStringValue(Script.JOB_NAME).
+					ifPresent(jobName -> jobModel.setJobName(jobName));
+			ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+		    JobManagerConfiguration jConfig = null;
+		    if (resourceJobManager != null) {
+			    jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+		    }
+		    JobStatus jobStatus = new JobStatus();
+		    File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+		    if (jobFile != null && jobFile.exists()) {
+			    jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+			    JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+					    processContext.getWorkingDir());
+				int exitCode = jobSubmissionOutput.getExitCode();
+				jobModel.setExitCode(exitCode);
+				jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+				jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+				String jobId = jobSubmissionOutput.getJobId();
+                String experimentId = taskContext.getExperimentId();
+                if (exitCode != 0 || jobSubmissionOutput.isJobSubmissionFailed()) {
+					jobModel.setJobId(DEFAULT_JOB_ID);
+					if (jobSubmissionOutput.isJobSubmissionFailed()) {
+						List<JobStatus> statusList = new ArrayList<>();
+						statusList.add(new JobStatus(JobState.FAILED));
+						statusList.get(0).setReason(jobSubmissionOutput.getFailureReason());
+						jobModel.setJobStatuses(statusList);
+						GFacUtils.saveJobModel(processContext, jobModel);
+						log.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
+                                experimentId, taskContext.getProcessId(), taskContext.getTaskId(), jobModel.getJobName());
+						ErrorModel errorModel = new ErrorModel();
+						errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
+						errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+						GFacUtils.saveExperimentError(processContext, errorModel);
+						GFacUtils.saveProcessError(processContext, errorModel);
+						GFacUtils.saveTaskError(taskContext, errorModel);
+						taskStatus.setState(TaskState.FAILED);
+						taskStatus.setReason("Job submission command didn't return a jobId");
+						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						taskContext.setTaskStatus(taskStatus);
+					} else {
+						String msg;
+						GFacUtils.saveJobModel(processContext, jobModel);
+						ErrorModel errorModel = new ErrorModel();
+						if (exitCode != Integer.MIN_VALUE) {
+							msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+									processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+									" return non zero exit code:" + exitCode + "  for JobName:" + jobModel.getJobName() +
+									", with failure reason : " + jobSubmissionOutput.getFailureReason()
+									+ " Hence changing job state to Failed." ;
+							errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+						} else {
+							msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+									processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+									" doesn't  return valid job submission exit code for JobName:" + jobModel.getJobName() +
+									", with failure reason : stdout ->" + jobSubmissionOutput.getStdOut() +
+									" stderr -> " + jobSubmissionOutput.getStdErr() + " Hence changing job state to Failed." ;
+							errorModel.setActualErrorMessage(msg);
+						}
+						log.error(msg);
+						errorModel.setUserFriendlyMessage(msg);
+						GFacUtils.saveExperimentError(processContext, errorModel);
+						GFacUtils.saveProcessError(processContext, errorModel);
+						GFacUtils.saveTaskError(taskContext, errorModel);
+						taskStatus.setState(TaskState.FAILED);
+						taskStatus.setReason(msg);
+						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						taskContext.setTaskStatus(taskStatus);
+					}
+					try {
+						GFacUtils.saveAndPublishTaskStatus(taskContext);
+					} catch (GFacException e) {
+						log.error("Error while saving task status", e);
+					}
+					return taskStatus;
+				} else if (jobId != null && !jobId.isEmpty()) {
+				    jobModel.setJobId(jobId);
+				    GFacUtils.saveJobModel(processContext, jobModel);
+				    jobStatus.setJobState(JobState.SUBMITTED);
+                    ComputeResourceDescription computeResourceDescription = taskContext.getParentProcessContext()
+                            .getComputeResourceDescription();
+                    jobStatus.setReason("Successfully Submitted to " + computeResourceDescription.getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+				    jobModel.setJobStatuses(Arrays.asList(jobStatus));
+				    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+				    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+					    jobStatus.setJobState(JobState.QUEUED);
+					    jobStatus.setReason("Verification step succeeded");
+                        jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+					    jobModel.setJobStatuses(Arrays.asList(jobStatus));
+					    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+				    }
+                    // doing gateway reporting
+                    if (computeResourceDescription.isGatewayUsageReporting()){
+                        String loadCommand = computeResourceDescription.getGatewayUsageModuleLoadCommand();
+                        String usageExecutable = computeResourceDescription.getGatewayUsageExecutable();
+                        ExperimentModel experiment = (ExperimentModel)taskContext.getParentProcessContext()
+								.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+                        String username = experiment.getUserName() + "@" + taskContext.getParentProcessContext().getUsageReportingGatewayId();
+                        RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " +  username  +
+                                                                           " -submit_time \"`date '+%F %T %:z'`\"  -jobid " + jobId );
+                        remoteCluster.execute(rawCommandInfo);
+                    }
+				    taskStatus = new TaskStatus(TaskState.COMPLETED);
+				    taskStatus.setReason("Submitted job to compute resource");
+                    taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+				} else {
+					int verificationTryCount = 0;
+					while (verificationTryCount++ < 3) {
+						String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+						if (verifyJobId != null && !verifyJobId.isEmpty()) {
+							// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+							jobId = verifyJobId;
+							jobModel.setJobId(jobId);
+							GFacUtils.saveJobModel(processContext, jobModel);
+							jobStatus.setJobState(JobState.QUEUED);
+							jobStatus.setReason("Verification step succeeded");
+							jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+							jobModel.setJobStatuses(Arrays.asList(jobStatus));
+							GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+							taskStatus.setState(TaskState.COMPLETED);
+							taskStatus.setReason("Submitted job to compute resource");
+							taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+							break;
+						}
+						log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+						Thread.sleep(verificationTryCount * 10000);
+					}
+				}
+
+			    if (jobId == null || jobId.isEmpty()) {
+					jobModel.setJobId(DEFAULT_JOB_ID);
+					GFacUtils.saveJobModel(processContext, jobModel);
+					String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+						    "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+						    "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+				    log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setUserFriendlyMessage(msg);
+                    errorModel.setActualErrorMessage(msg);
+				    GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+				    taskStatus.setState(TaskState.FAILED);
+				    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+                    taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+			    }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+		    } else {
+			    taskStatus.setState(TaskState.FAILED);
+			    if (jobFile == null) {
+				    taskStatus.setReason("JobFile is null");
+			    } else {
+				    taskStatus.setReason("Job file doesn't exist");
+			    }
+		    }
+
+	    } catch (AppCatalogException e) {
+		    String msg = "Error while instantiating app catalog";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+	    } catch (ApplicationSettingsException e) {
+		    String msg = "Error occurred while creating job descriptor";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+	    } catch (GFacException e) {
+		    String msg = "Error occurred while submitting the job";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+	    } catch (IOException e) {
+		    String msg = "Error while reading the content of the job file";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+	    } catch (InterruptedException e) {
+		    String msg = "Error occurred while verifying the job submission";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+		} catch (Throwable e) {
+			String msg = "JobSubmission failed";
+			log.error(msg, e);
+			taskStatus.setState(TaskState.FAILED);
+			taskStatus.setReason(msg);
+			taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+
+        taskContext.setTaskStatus(taskStatus);
+	    try {
+		    GFacUtils.saveAndPublishTaskStatus(taskContext);
+	    } catch (GFacException e) {
+		    log.error("Error while saving task status", e);
+	    }
+	    return taskStatus;
+    }
+
+    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws GFacException {
+        JobStatus status = remoteCluster.getJobStatus(jobID);
+        return status != null &&  status.getJobState() != JobState.UNKNOWN;
+    }
+
+    private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+        String jobName = jobDetails.getJobName();
+        String jobId = null;
+        try {
+            jobId  = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+        } catch (GFacException e) {
+            log.error("Error while verifying JobId from JobName");
+        }
+        return jobId;
+    }
+
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            // original job failed before submitting
+            if (jobModel == null || jobModel.getJobId() == null ){
+                return execute(taskContext);
+            }else {
+	            // job is already submitted and monitor should handle the recovery
+	            return new TaskStatus(TaskState.COMPLETED);
+            }
+    }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
+
+	@Override
+	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+		ProcessContext processContext = taskcontext.getParentProcessContext();
+		RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+		JobModel jobModel = processContext.getJobModel();
+		int retryCount = 0;
+		if (jobModel != null) {
+			if (processContext.getProcessState() == ProcessState.EXECUTING) {
+				while (jobModel.getJobId() == null) {
+					log.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
+					try {
+						Thread.sleep(waitForProcessIdmillis);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+			}
+
+			try {
+				JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+				while (oldJobStatus == null && retryCount <= 5) {
+					retryCount++;
+					Thread.sleep(retryCount * 1000);
+					oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+				}
+				if (oldJobStatus != null) {
+					oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+					return oldJobStatus;
+				} else {
+					throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " +
+							jobModel.getJobId());
+				}
+			} catch ( GFacException | InterruptedException e) {
+				throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e);
+			}
+		} else {
+			throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..6a6a8c0
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class ForkJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(ForkJobSubmissionTask.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        try {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            jobModel.setTaskId(taskContext.getTaskId());
+            RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+            GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+            jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
+            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+            JobManagerConfiguration jConfig = null;
+            if (resourceJobManager != null) {
+                jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+            }
+            JobStatus jobStatus = new JobStatus();
+	        File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+	        if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+	            JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+			            processContext.getWorkingDir());
+	            jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+	            jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+	            jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+	            String jobId = jobSubmissionOutput.getJobId();
+	            if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                    jobStatus.setJobState(JobState.SUBMITTED);
+                    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+                            .getComputeResourceDescription().getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                    taskStatus = new TaskStatus(TaskState.COMPLETED);
+                    taskStatus.setReason("Submitted job to compute resource");
+                }
+                if (jobId == null || jobId.isEmpty()) {
+                    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                            "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+                            "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+                    log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setActualErrorMessage(msg);
+                    errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                    GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+                    taskStatus.setState(TaskState.FAILED);
+                    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+                }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+            } else {
+                taskStatus.setState(TaskState.FAILED);
+                if (jobFile == null) {
+                    taskStatus.setReason("JobFile is null");
+                } else {
+                    taskStatus.setReason("Job file doesn't exist");
+                }
+            }
+        } catch (ApplicationSettingsException e) {
+            String msg = "Error occurred while creating job descriptor";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (AppCatalogException e) {
+            String msg = "Error while instantiating app catalog";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (GFacException e) {
+            String msg = "Error occurred while submitting the job";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (IOException e) {
+            String msg = "Error while reading the content of the job file";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        //TODO implement recovery scenario instead of calling execute.
+        return execute(taskContext);
+    }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
+
+	@Override
+	public JobStatus cancel(TaskContext taskcontext) {
+		// TODO - implement cancel with SSH Fork
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
new file mode 100644
index 0000000..aa2cbbd
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class LocalJobSubmissionTask implements JobSubmissionTask{
+    private static final Logger log = LoggerFactory.getLogger(LocalJobSubmissionTask.class);
+    private ProcessBuilder builder;
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        try {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            jobModel.setTaskId(taskContext.getTaskId());
+
+            RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+            GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext,taskContext);
+
+            String jobId = AiravataUtils.getId("JOB_ID_");
+            jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
+            jobModel.setJobId(jobId);
+
+            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+            JobManagerConfiguration jConfig = null;
+
+            if (resourceJobManager != null) {
+                jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+            }
+
+            JobStatus jobStatus = new JobStatus();
+            File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+            if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+
+                GFacUtils.saveJobModel(processContext, jobModel);
+
+                JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+                        processContext.getWorkingDir());
+
+                jobStatus.setJobState(JobState.SUBMITTED);
+                jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+                        .getComputeResourceDescription().getHostName());
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                //log job submit status
+                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+
+                //for local, job gets completed synchronously
+                //so changing job status to complete
+
+                jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+                jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+                jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+
+
+                jobModel.setJobId(jobId);
+                jobStatus.setJobState(JobState.COMPLETE);
+                jobStatus.setReason("Successfully Completed " + taskContext.getParentProcessContext()
+                        .getComputeResourceDescription().getHostName());
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                //log job complete status
+                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+
+
+                taskStatus = new TaskStatus(TaskState.COMPLETED);
+                taskStatus.setReason("Submitted job to compute resource");
+
+            } else {
+                taskStatus.setState(TaskState.FAILED);
+                if (jobFile == null) {
+                    taskStatus.setReason("JobFile is null");
+                } else {
+                    taskStatus.setReason("Job file doesn't exist");
+                }
+            }
+
+        } catch (GFacException | IOException | AppCatalogException | ApplicationSettingsException e) {
+            String msg = "Error occurred while submitting a local job";
+            log.error(msg, e);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+            taskStatus.setState(TaskState.FAILED);
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return null;
+    }
+
+    private List<String> buildCommand(ProcessContext processContext) {
+        List<String> cmdList = new ArrayList<>();
+        cmdList.add(processContext.getApplicationDeploymentDescription().getExecutablePath());
+        List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
+
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (InputDataObjectType input : processInputs) {
+                sortedInputSet.add(input);
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                cmdList.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                cmdList.add(inputDataObjectType.getValue());
+            }
+        }
+        return cmdList;
+    }
+
+    private void initProcessBuilder(ApplicationDeploymentDescription app, List<String> cmdList){
+        builder = new ProcessBuilder(cmdList);
+
+        List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+        if (setEnvironment != null) {
+            for (SetEnvPaths envPath : setEnvironment) {
+                Map<String,String> builderEnv = builder.environment();
+                builderEnv.put(envPath.getName(), envPath.getValue());
+            }
+        }
+    }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
+
+	@Override
+	public JobStatus cancel(TaskContext taskcontext) {
+		// TODO - implement Local Job cancel
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
new file mode 100644
index 0000000..360464e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.worker.task.jobsubmission.utils;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public class JobSubmissionUtils {
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
index abc62aa..11f2c03 100644
--- a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
@@ -19,8 +19,8 @@
  */
 package org.apache.airavata.worker.commons.cluster;
 
-import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
 
 public abstract class AbstractRemoteCluster implements RemoteCluster {
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
new file mode 100644
index 0000000..022cb79
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public interface JobManagerConfiguration {
+
+    public RawCommandInfo getCancelCommand(String jobID);
+
+    public String getJobDescriptionTemplateName();
+
+    public RawCommandInfo getMonitorCommand(String jobID);
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName);
+
+    public String getScriptExtension();
+
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+    public OutputParser getParser();
+
+    public String getInstalledPath();
+
+    public String getBaseCancelCommand();
+
+    public String getBaseMonitorCommand();
+
+    public String getBaseSubmitCommand();
+}