You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 16:06:43 UTC
[1/2] airavata git commit: Add utility classes for worker
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt 993bc1d07 -> aeaf35b46
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
new file mode 100644
index 0000000..2a104dc
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
@@ -0,0 +1,194 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.status.*;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.airavata.worker.commons.context.ProcessContext;
+import org.apache.airavata.worker.commons.context.TaskContext;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public class WorkerUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkerUtils.class);
+
+ public static void saveExperimentError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String experimentId = processContext.getExperimentId();
+ errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+ experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment errors";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static void saveProcessError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+ experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, processContext.getProcessId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating process errors";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static void saveTaskError(TaskContext taskContext, ErrorModel errorModel) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = taskContext.getParentProcessContext().getExperimentCatalog();
+ String taskId = taskContext.getTaskId();
+ errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+ experimentCatalog.add(ExpCatChildDataType.TASK_ERROR, errorModel, taskId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + taskContext.getParentProcessContext().getExperimentId() + " processId: " + taskContext.getParentProcessContext().getProcessId() + " taskId: " + taskContext.getTaskId()
+ + " : - Error while updating task errors";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static void handleProcessInterrupt(ProcessContext processContext) throws WorkerException {
+ if (processContext.isCancel()) {
+ ProcessStatus pStatus = new ProcessStatus(ProcessState.CANCELLING);
+ pStatus.setReason("Process Cancel triggered");
+ pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(pStatus);
+ saveAndPublishProcessStatus(processContext);
+ // do cancel operation here
+
+ pStatus.setState(ProcessState.CANCELED);
+ processContext.setProcessStatus(pStatus);
+ pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ saveAndPublishProcessStatus(processContext);
+ }else if (processContext.isHandOver()) {
+
+ } else {
+ logger.error("expId: {}, processId: {} :- Unknown process interrupt", processContext.getExperimentId(),
+ processContext.getProcessId());
+ }
+ }
+
+ public static JobModel getJobModel(ProcessContext processContext) throws RegistryException {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ List<Object> objects = experimentCatalog.get(ExperimentCatalogModelType.JOB,
+ Constants.FieldConstants.JobConstants.PROCESS_ID, processContext.getProcessId());
+ List<JobModel> jobModels = new ArrayList<>();
+ JobModel jobModel = null;
+ if (objects != null) {
+ for (Object object : objects) {
+ jobModel = ((JobModel) object);
+ if (jobModel.getJobId() != null || !jobModel.equals("")) {
+ return jobModel;
+ }
+ }
+ }
+ return jobModel;
+ }
+
+ public static List<String> parseTaskDag(String taskDag) {
+ // TODO - parse taskDag and create taskId list
+ String[] tasks = taskDag.split(",");
+ return Arrays.asList(tasks);
+ }
+
+ public static void saveAndPublishTaskStatus(TaskContext taskContext) throws WorkerException {
+ try {
+ TaskState state = taskContext.getTaskState();
+ // first we save job jobModel to the registry for sa and then save the job status.
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ TaskStatus status = taskContext.getTaskStatus();
+ if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ status.setTimeOfStateChange(status.getTimeOfStateChange());
+ }
+ experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId());
+ TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(),
+ processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+ processContext.getGatewayId());
+ TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
+ identifier);
+ MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+ (MessageType.TASK.name()), taskContext.getParentProcessContext().getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ processContext.getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ throw new WorkerException("Error persisting task status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void saveAndPublishProcessStatus(ProcessContext processContext) throws WorkerException {
+ try {
+ // first we save job jobModel to the registry for sa and then save the job status.
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ ProcessStatus status = processContext.getProcessStatus();
+ if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ status.setTimeOfStateChange(status.getTimeOfStateChange());
+ }
+ experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId());
+ ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(),
+ processContext.getProcessModel().getExperimentId(),
+ processContext.getGatewayId());
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ processContext.getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ throw new WorkerException("Error persisting process status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void saveJobStatus(ProcessContext processContext, JobModel jobModel) throws WorkerException {
+ try {
+ // first we save job jobModel to the registry for sa and then save the job status.
+ JobStatus jobStatus = null;
+ if(jobModel.getJobStatuses() != null)
+ jobStatus = jobModel.getJobStatuses().get(0);
+
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ List<JobStatus> statuses = new ArrayList<>();
+ statuses.add(jobStatus);
+ jobModel.setJobStatuses(statuses);
+ if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ){
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+ }
+ CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
+ experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+ JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+ processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+ processContext.getGatewayId());
+ JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+ MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+ (MessageType.JOB.name()), processContext.getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ processContext.getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ throw new WorkerException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+}
[2/2] airavata git commit: Add utility classes for worker
Posted by go...@apache.org.
Add utility classes for worker
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/aeaf35b4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/aeaf35b4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/aeaf35b4
Branch: refs/heads/feature-workload-mgmt
Commit: aeaf35b4615374c00f7477de9ae46a36f69ed88b
Parents: 993bc1d
Author: Gourav Shenoy <go...@apache.org>
Authored: Thu Apr 13 12:06:40 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Thu Apr 13 12:06:40 2017 -0400
----------------------------------------------------------------------
.../impl/AuroraJobSubmissionTask.java | 160 ++++++
.../impl/BESJobSubmissionTask.java | 501 +++++++++++++++++++
.../impl/DefaultJobSubmissionTask.java | 377 ++++++++++++++
.../impl/ForkJobSubmissionTask.java | 176 +++++++
.../impl/LocalJobSubmissionTask.java | 202 ++++++++
.../jobsubmission/utils/JobSubmissionUtils.java | 7 +
.../worker/task/jobsubmission/utils/sample | 0
.../commons/cluster/AbstractRemoteCluster.java | 4 +-
.../commons/utils/JobManagerConfiguration.java | 34 ++
.../worker/commons/utils/WorkerUtils.java | 194 +++++++
10 files changed, 1653 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
new file mode 100644
index 0000000..2e55fe7
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/AuroraJobSubmissionTask.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.IdentityBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.bean.ProcessBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResourceBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean;
+import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.GroovyMap;
+import org.apache.airavata.gfac.core.Script;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuroraJobSubmissionTask implements JobSubmissionTask{
+
+ private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmissionTask.class);
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.ACTIVE);
+ return jobStatus;
+ }
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ String jobIdAndName = "A" + GFacUtils.generateJobName();
+ jobModel.setJobName(jobIdAndName);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+
+ try {
+ JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName);
+ IdentityBean owner = new IdentityBean(AuroraUtils.ROLE);
+ GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+ groovyMap.add(Script.JOB_SUBMITTER_COMMAND, "sh");
+ String templateFileName = GFacUtils.getTemplateFileName(ResourceJobManagerType.CLOUD);
+ String script = GFacUtils.generateScript(groovyMap, templateFileName);
+ Set<ProcessBean> processes = new LinkedHashSet<>();
+ ProcessBean process_1 = new ProcessBean("main_process", script, false);
+ processes.add(process_1);
+
+ groovyMap.getStringValue(Script.STANDARD_OUT_FILE)
+ .ifPresent(stdout -> {
+ ProcessBean stdOutProcess = new ProcessBean("stdout_copy_process", "cp .logs/main_process/0/stdout " + stdout, false);
+ processes.add(stdOutProcess);
+ });
+
+ groovyMap.getStringValue(Script.STANDARD_ERROR_FILE)
+ .ifPresent(stderr -> {
+ ProcessBean stdErrProcess = new ProcessBean("stderr_copy_process", "cp .logs/main_process/0/stderr " + stderr, false);
+ processes.add(stdErrProcess);
+ });
+
+ ResourceBean resources = new ResourceBean(1.5, 512, 512);
+
+ TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources);
+ JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER);
+
+ String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig);
+ log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson);
+
+ AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient();
+ ResponseBean response = client.createJob(jobConfig);
+ log.info("Response for job {}, {}", jobIdAndName, response);
+ jobModel.setJobDescription(resources.toString());
+
+ jobModel.setJobId(jobIdAndName);
+ jobStatus.setReason("Successfully Submitted");
+ jobModel.setJobStatuses(Arrays.asList(jobStatus ));
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.getParentProcessContext().setJobModel(jobModel);
+
+ GFacUtils.saveJobModel(processContext, jobModel);
+ GFacUtils.saveJobStatus(processContext, jobModel);
+ taskStatus.setReason("Successfully submitted job to Aurora");
+ } catch (Throwable e) {
+ String msg = "Error occurred while submitting Aurora job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+
+ taskContext.setTaskStatus(taskStatus);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return taskStatus;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
new file mode 100644
index 0000000..2035d75
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
@@ -0,0 +1,501 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import de.fzj.unicore.bes.client.ActivityClient;
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.gfac.impl.task.utils.bes.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+public class BESJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(BESJobSubmissionTask.class);
+ private DefaultClientConfiguration secProperties;
+
+ private String jobId;
+ private String hostName;
+ private String userName;
+ private String inputPath;
+ private int DEFAULT_SSH_PORT = 22;
+ private AuthenticationInfo authenticationInfo;
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+ return null;
+ }
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ StorageClient sc = null;
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ // FIXME - use original output dir
+ setInputOutputLocations(processContext);
+ try {
+ // con't reuse if UserDN has been changed.
+ secProperties = getSecurityConfig(processContext);
+ // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
+ } catch (GFacException e) {
+ String msg = "Unicorn security context initialization error";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ return taskStatus;
+ }
+
+ try {
+ JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
+ JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(
+ jobSubmissionInterface.getJobSubmissionInterfaceId());
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+ String userDN = processContext.getProcessModel().getUserDn();
+
+ CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null);
+ sc = storageCreator.createStorage();
+
+ JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(processContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+ log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
+
+ // copy files to local
+ copyInputFilesToLocal(taskContext);
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(processContext, sc);
+ dt.uploadLocalFiles();
+
+ JobModel jobDetails = new JobModel();
+ jobDetails.setTaskId(taskContext.getTaskId());
+ jobDetails.setProcessId(taskContext.getProcessId());
+ FactoryClient factory = new FactoryClient(eprt, secProperties);
+
+ log.info("Activity Submitting to {} ... \n", factoryUrl);
+ CreateActivityResponseDocument response = factory.createActivity(cad);
+ log.info("Activity Submitted to {} ... \n", factoryUrl);
+
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis())
+ .toString();
+ }
+ log.info("JobID: " + jobId);
+ jobDetails.setJobId(jobId);
+ jobDetails.setJobDescription(activityEpr.toString());
+ jobDetails.setJobStatuses(Arrays.asList(new JobStatus(JobState.SUBMITTED)));
+ processContext.setJobModel(jobDetails);
+ GFacUtils.saveJobModel(processContext, jobDetails);
+ GFacUtils.saveJobStatus(processContext, jobDetails);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ waitUntilDone(eprt, activityEpr, processContext, secProperties);
+
+ ActivityStatusType activityStatus = null;
+ activityStatus = getStatus(factory, activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+ ActivityClient activityClient;
+ activityClient = new ActivityClient(activityEpr, secProperties);
+ // now use the activity working directory property
+ dt.setStorageClient(activityClient.getUspaceClient());
+
+ List<OutputDataObjectType> copyOutput = null;
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode()
+ .getLocalPart()
+ + "\n"
+ + activityStatus.getFault().getFaultstring()
+ + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.error(error);
+
+ JobState applicationJobStatus = JobState.FAILED;
+ jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+ sendNotification(processContext, jobDetails);
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
+
+ //What if job is failed before execution and there are not stdouts generated yet?
+ log.debug("Downloading any standard output and error files, if they were produced.");
+ copyOutput = dt.downloadRemoteFiles();
+
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ JobState applicationJobStatus = JobState.CANCELED;
+ jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+ GFacUtils.saveJobStatus(processContext, jobDetails);
+ throw new GFacException(
+ processContext.getExperimentId() + "Job Canceled");
+ } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ignored) {
+ }
+ JobState applicationJobStatus = JobState.COMPLETE;
+ jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
+ GFacUtils.saveJobStatus(processContext, jobDetails);
+ log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(),
+ activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString());
+
+// if (activityStatus.getExitCode() == 0) {
+// } else {
+// dt.downloadStdOuts();
+// }
+ copyOutput = dt.downloadRemoteFiles();
+ }
+ if (copyOutput != null) {
+ copyOutputFilesToStorage(taskContext, copyOutput);
+ for (OutputDataObjectType outputDataObjectType : copyOutput) {
+ GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+ }
+ }
+// dt.publishFinalOutputs();
+ taskStatus.setState(TaskState.COMPLETED);
+ } catch (AppCatalogException e) {
+ log.error("Error while retrieving UNICORE job submission.." , e);
+ taskStatus.setState(TaskState.FAILED);
+ } catch (Exception e) {
+ log.error("BES task failed... ", e);
+ taskStatus.setState(TaskState.FAILED);
+ }
+
+ return taskStatus;
+ }
+
+ private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+ ProcessContext pc = taskContext.getParentProcessContext();
+ String remoteFilePath = null, fileName = null, localFilePath = null;
+ try {
+ authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ ServerInfo serverInfo = pc.getComputeResourceServerInfo();
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ for (OutputDataObjectType output : copyOutput) {
+ switch (output.getType()) {
+ case STDERR: case STDOUT: case STRING: case URI:
+ localFilePath = output.getValue();
+ if (localFilePath.contains("://")) {
+ localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
+ }
+ fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
+ URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+ remoteFilePath = destinationURI.getPath();
+ log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath);
+ SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
+ output.setValue(destinationURI.toString());
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (IOException | JSchException | SSHApiException | URISyntaxException | CredentialStoreException e) {
+ log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
+ throw new GFacException("Error while scp output files to remote storage file location", e);
+ }
+ }
+
+ private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+ ProcessContext pc = taskContext.getParentProcessContext();
+ StorageResourceDescription storageResource = pc.getStorageResource();
+
+ if (storageResource != null) {
+ hostName = storageResource.getHostName();
+ } else {
+ throw new GFacException("Storage Resource is null");
+ }
+ inputPath = pc.getStorageFileSystemRootLocation();
+ inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+ String remoteFilePath = null, fileName = null, localFilePath = null;
+ URI remoteFileURI = null;
+ try {
+ authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ ServerInfo serverInfo = pc.getStorageResourceServerInfo();
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+ List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
+ for (InputDataObjectType input : processInputs) {
+ if (input.getType() == DataType.URI) {
+ remoteFileURI = new URI(input.getValue());
+ remoteFilePath = remoteFileURI.getPath();
+ fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
+ localFilePath = pc.getInputDir() + File.separator + fileName;
+ log.info("SCP remote file :{} -> to local :{}", remoteFilePath, localFilePath);
+ SSHUtils.scpFrom(remoteFilePath, localFilePath, sshSession);
+ input.setValue("file:/" + localFilePath);
+ }
+ }
+ } catch (IOException | JSchException | SSHApiException | URISyntaxException e) {
+ log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
+ throw new GFacException("Error while scp input files to local file location", e);
+ } catch (CredentialStoreException e) {
+ String msg = "Authentication issue, make sure you are passing valid credential token";
+ log.error(msg, e);
+ throw new GFacException(msg, e);
+ }
+ }
+
+ private void setInputOutputLocations(ProcessContext processContext) {
+ String localPath = System.getProperty("java.io.tmpdir") + File.separator + processContext.getProcessId();
+ new File(localPath).mkdir();
+
+ processContext.setInputDir(localPath);
+ processContext.setOutputDir(localPath);
+ }
+
+ private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
+ DefaultClientConfiguration clientConfig = null;
+ try {
+ UNICORESecurityContext unicoreSecurityContext = SecurityUtils.getSecurityContext(pc);
+ UserConfigurationDataModel userConfigDataModel = (UserConfigurationDataModel) pc.getExperimentCatalog().
+ get(ExperimentCatalogModelType.USER_CONFIGURATION_DATA, pc.getExperimentId());
+ // FIXME - remove following setter lines, and use original value comes with user configuration data model.
+ userConfigDataModel.setGenerateCert(true);
+// userConfigDataModel.setUserDN("CN=swus3, O=Ultrascan Gateway, C=DE");
+ if (userConfigDataModel.isGenerateCert()) {
+ clientConfig = unicoreSecurityContext.getDefaultConfiguration(false, userConfigDataModel);
+ } else {
+ clientConfig = unicoreSecurityContext.getDefaultConfiguration(false);
+ }
+ } catch (RegistryException e) {
+ throw new GFacException("Error! reading user configuration data from registry", e);
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException("Error! retrieving default client configurations", e);
+ }
+
+ return clientConfig;
+ }
+
+ protected String formatStatusMessage(String activityUrl, String status) {
+ return String.format("Activity %s is %s.\n", activityUrl, status);
+ }
+
+ protected void waitUntilDone(EndpointReferenceType factoryEpr, EndpointReferenceType activityEpr, ProcessContext processContext, DefaultClientConfiguration secProperties) throws Exception {
+
+ try {
+ FactoryClient factoryClient = new FactoryClient(factoryEpr, secProperties);
+ JobState applicationJobStatus = null;
+
+ ActivityStateEnumeration.Enum activityStatus = factoryClient.getActivityStatus(activityEpr);
+ while ((activityStatus != ActivityStateEnumeration.FINISHED)
+ && (activityStatus != ActivityStateEnumeration.FAILED)
+ && (activityStatus != ActivityStateEnumeration.CANCELLED)
+ && (applicationJobStatus != JobState.COMPLETE)) {
+
+ ActivityStatusType activityStatusType = getStatus(factoryClient, activityEpr);
+ applicationJobStatus = getApplicationJobStatus(activityStatusType);
+ sendNotification(processContext,processContext.getJobModel());
+ // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+ // applicationJobStatus);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ activityStatus = factoryClient.getActivityStatus(activityEpr);
+ }
+ } catch(Exception e) {
+ log.error("Error monitoring job status..");
+ throw e;
+ }
+ }
+
+ private void sendNotification(ProcessContext processContext, JobModel jobModel) throws GFacException {
+ GFacUtils.saveJobStatus(processContext, jobModel);
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+ throws UnknownActivityIdentifierFault {
+
+ GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory
+ .newInstance();
+
+ stats.addNewGetActivityStatuses().setActivityIdentifierArray(
+ new EndpointReferenceType[] { activityEpr });
+
+ GetActivityStatusesResponseDocument resDoc = fc
+ .getActivityStatuses(stats);
+
+ ActivityStatusType activityStatus = resDoc
+ .getGetActivityStatusesResponse().getResponseArray()[0]
+ .getActivityStatus();
+ return activityStatus;
+ }
+
+ private JobState getApplicationJobStatus(ActivityStatusType activityStatus) {
+ if (activityStatus == null) {
+ return JobState.UNKNOWN;
+ }
+ ActivityStateEnumeration.Enum state = activityStatus.getState();
+ String status = null;
+ XmlCursor acursor = activityStatus.newCursor();
+ try {
+ if (acursor.toFirstChild()) {
+ if (acursor.getName().getNamespaceURI()
+ .equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+ status = acursor.getName().getLocalPart();
+ }
+ }
+ if (status != null) {
+ if (status.equalsIgnoreCase("Queued")
+ || status.equalsIgnoreCase("Starting")
+ || status.equalsIgnoreCase("Ready")) {
+ return JobState.QUEUED;
+ } else if (status.equalsIgnoreCase("Staging-In")) {
+ return JobState.SUBMITTED;
+ } else if (status.equalsIgnoreCase("FINISHED")) {
+ return JobState.COMPLETE;
+ }else if(status.equalsIgnoreCase("Staging-Out")){
+ return JobState.ACTIVE;
+ }
+ else if (status.equalsIgnoreCase("Executing")) {
+ return JobState.ACTIVE;
+ } else if (status.equalsIgnoreCase("FAILED")) {
+ return JobState.FAILED;
+ } else if (status.equalsIgnoreCase("CANCELLED")) {
+ return JobState.CANCELED;
+ }
+ } else {
+ if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+ return JobState.CANCELED;
+ } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+ return JobState.FAILED;
+ } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+ return JobState.COMPLETE;
+ } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+ return JobState.ACTIVE;
+ }
+ }
+ } finally {
+ if (acursor != null)
+ acursor.dispose();
+ }
+ return JobState.UNKNOWN;
+ }
+
+ /**
+ * EndpointReference need to be saved to make cancel work.
+ *
+ * @param processContext
+ * @throws GFacException
+ */
+ public boolean cancelJob(ProcessContext processContext) throws GFacException {
+ try {
+ String activityEpr = processContext.getJobModel().getJobDescription();
+ // initSecurityProperties(processContext);
+ EndpointReferenceType eprt = EndpointReferenceType.Factory
+ .parse(activityEpr);
+ JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
+ String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
+ EndpointReferenceType epr = EndpointReferenceType.Factory
+ .newInstance();
+ epr.addNewAddress().setStringValue(factoryUrl);
+
+ FactoryClient factory = new FactoryClient(epr, secProperties);
+ factory.terminateActivity(eprt);
+ return true;
+ } catch (Exception e) {
+ throw new GFacException(e.getLocalizedMessage(), e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..beeb66e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
@@ -0,0 +1,377 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.*;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class DefaultJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+ public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
+ private static int waitForProcessIdmillis = 5000;
+ private static int pauseTimeInSec = waitForProcessIdmillis / 1000;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext){
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+ GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+ groovyMap.getStringValue(Script.JOB_NAME).
+ ifPresent(jobName -> jobModel.setJobName(jobName));
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+ JobStatus jobStatus = new JobStatus();
+ File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ int exitCode = jobSubmissionOutput.getExitCode();
+ jobModel.setExitCode(exitCode);
+ jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
+ String experimentId = taskContext.getExperimentId();
+ if (exitCode != 0 || jobSubmissionOutput.isJobSubmissionFailed()) {
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ if (jobSubmissionOutput.isJobSubmissionFailed()) {
+ List<JobStatus> statusList = new ArrayList<>();
+ statusList.add(new JobStatus(JobState.FAILED));
+ statusList.get(0).setReason(jobSubmissionOutput.getFailureReason());
+ jobModel.setJobStatuses(statusList);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ log.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
+ experimentId, taskContext.getProcessId(), taskContext.getTaskId(), jobModel.getJobName());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
+ errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Job submission command didn't return a jobId");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ } else {
+ String msg;
+ GFacUtils.saveJobModel(processContext, jobModel);
+ ErrorModel errorModel = new ErrorModel();
+ if (exitCode != Integer.MIN_VALUE) {
+ msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+ processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+ " return non zero exit code:" + exitCode + " for JobName:" + jobModel.getJobName() +
+ ", with failure reason : " + jobSubmissionOutput.getFailureReason()
+ + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+ } else {
+ msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+ processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+ " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
+ ", with failure reason : stdout ->" + jobSubmissionOutput.getStdOut() +
+ " stderr -> " + jobSubmissionOutput.getStdErr() + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(msg);
+ }
+ log.error(msg);
+ errorModel.setUserFriendlyMessage(msg);
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ }
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return taskStatus;
+ } else if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.SUBMITTED);
+ ComputeResourceDescription computeResourceDescription = taskContext.getParentProcessContext()
+ .getComputeResourceDescription();
+ jobStatus.setReason("Successfully Submitted to " + computeResourceDescription.getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ }
+ // doing gateway reporting
+ if (computeResourceDescription.isGatewayUsageReporting()){
+ String loadCommand = computeResourceDescription.getGatewayUsageModuleLoadCommand();
+ String usageExecutable = computeResourceDescription.getGatewayUsageExecutable();
+ ExperimentModel experiment = (ExperimentModel)taskContext.getParentProcessContext()
+ .getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ String username = experiment.getUserName() + "@" + taskContext.getParentProcessContext().getUsageReportingGatewayId();
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username +
+ " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId );
+ remoteCluster.execute(rawCommandInfo);
+ }
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ } else {
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ taskStatus.setState(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ break;
+ }
+ log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+ Thread.sleep(verificationTryCount * 10000);
+ }
+ }
+
+ if (jobId == null || jobId.isEmpty()) {
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(msg);
+ errorModel.setActualErrorMessage(msg);
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ GFacUtils.saveJobModel(processContext, jobModel);
+ }
+ } else {
+ taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ taskStatus.setReason("JobFile is null");
+ } else {
+ taskStatus.setReason("Job file doesn't exist");
+ }
+ }
+
+ } catch (AppCatalogException e) {
+ String msg = "Error while instantiating app catalog";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (ApplicationSettingsException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (GFacException e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (IOException e) {
+ String msg = "Error while reading the content of the job file";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (InterruptedException e) {
+ String msg = "Error occurred while verifying the job submission";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (Throwable e) {
+ String msg = "JobSubmission failed";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+
+ taskContext.setTaskStatus(taskStatus);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return taskStatus;
+ }
+
+ private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws GFacException {
+ JobStatus status = remoteCluster.getJobStatus(jobID);
+ return status != null && status.getJobState() != JobState.UNKNOWN;
+ }
+
+ private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+ String jobName = jobDetails.getJobName();
+ String jobId = null;
+ try {
+ jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+ } catch (GFacException e) {
+ log.error("Error while verifying JobId from JobName");
+ }
+ return jobId;
+ }
+
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ // original job failed before submitting
+ if (jobModel == null || jobModel.getJobId() == null ){
+ return execute(taskContext);
+ }else {
+ // job is already submitted and monitor should handle the recovery
+ return new TaskStatus(TaskState.COMPLETED);
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+ ProcessContext processContext = taskcontext.getParentProcessContext();
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+ JobModel jobModel = processContext.getJobModel();
+ int retryCount = 0;
+ if (jobModel != null) {
+ if (processContext.getProcessState() == ProcessState.EXECUTING) {
+ while (jobModel.getJobId() == null) {
+ log.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
+ try {
+ Thread.sleep(waitForProcessIdmillis);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ try {
+ JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+ while (oldJobStatus == null && retryCount <= 5) {
+ retryCount++;
+ Thread.sleep(retryCount * 1000);
+ oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+ }
+ if (oldJobStatus != null) {
+ oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+ return oldJobStatus;
+ } else {
+ throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " +
+ jobModel.getJobId());
+ }
+ } catch ( GFacException | InterruptedException e) {
+ throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e);
+ }
+ } else {
+ throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..6a6a8c0
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class ForkJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(ForkJobSubmissionTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+ GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+ jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+ JobStatus jobStatus = new JobStatus();
+ File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+ .getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ }
+ if (jobId == null || jobId.isEmpty()) {
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(msg);
+ errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+ }else {
+ GFacUtils.saveJobModel(processContext, jobModel);
+ }
+ } else {
+ taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ taskStatus.setReason("JobFile is null");
+ } else {
+ taskStatus.setReason("Job file doesn't exist");
+ }
+ }
+ } catch (ApplicationSettingsException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (AppCatalogException e) {
+ String msg = "Error while instantiating app catalog";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (GFacException e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (IOException e) {
+ String msg = "Error while reading the content of the job file";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ return taskStatus;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ //TODO implement recovery scenario instead of calling execute.
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) {
+ // TODO - implement cancel with SSH Fork
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
new file mode 100644
index 0000000..aa2cbbd
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class LocalJobSubmissionTask implements JobSubmissionTask{
+ private static final Logger log = LoggerFactory.getLogger(LocalJobSubmissionTask.class);
+ private ProcessBuilder builder;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+ GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext,taskContext);
+
+ String jobId = AiravataUtils.getId("JOB_ID_");
+ jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
+ jobModel.setJobId(jobId);
+
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+
+ JobStatus jobStatus = new JobStatus();
+ File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+
+ GFacUtils.saveJobModel(processContext, jobModel);
+
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+ .getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ //log job submit status
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+
+ //for local, job gets completed synchronously
+ //so changing job status to complete
+
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+
+
+ jobModel.setJobId(jobId);
+ jobStatus.setJobState(JobState.COMPLETE);
+ jobStatus.setReason("Successfully Completed " + taskContext.getParentProcessContext()
+ .getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ //log job complete status
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+
+
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+
+ } else {
+ taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ taskStatus.setReason("JobFile is null");
+ } else {
+ taskStatus.setReason("Job file doesn't exist");
+ }
+ }
+
+ } catch (GFacException | IOException | AppCatalogException | ApplicationSettingsException e) {
+ String msg = "Error occurred while submitting a local job";
+ log.error(msg, e);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ taskStatus.setState(TaskState.FAILED);
+ }
+ return taskStatus;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return null;
+ }
+
+ private List<String> buildCommand(ProcessContext processContext) {
+ List<String> cmdList = new ArrayList<>();
+ cmdList.add(processContext.getApplicationDeploymentDescription().getExecutablePath());
+ List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
+
+ // sort the inputs first and then build the command List
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (InputDataObjectType input : processInputs) {
+ sortedInputSet.add(input);
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ cmdList.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ cmdList.add(inputDataObjectType.getValue());
+ }
+ }
+ return cmdList;
+ }
+
+ private void initProcessBuilder(ApplicationDeploymentDescription app, List<String> cmdList){
+ builder = new ProcessBuilder(cmdList);
+
+ List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+ if (setEnvironment != null) {
+ for (SetEnvPaths envPath : setEnvironment) {
+ Map<String,String> builderEnv = builder.environment();
+ builderEnv.put(envPath.getName(), envPath.getValue());
+ }
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) {
+ // TODO - implement Local Job cancel
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
new file mode 100644
index 0000000..360464e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.worker.task.jobsubmission.utils;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public class JobSubmissionUtils {
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/sample
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
index abc62aa..11f2c03 100644
--- a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
@@ -19,8 +19,8 @@
*/
package org.apache.airavata.worker.commons.cluster;
-import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
public abstract class AbstractRemoteCluster implements RemoteCluster {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aeaf35b4/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
new file mode 100644
index 0000000..022cb79
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public interface JobManagerConfiguration {
+
+ public RawCommandInfo getCancelCommand(String jobID);
+
+ public String getJobDescriptionTemplateName();
+
+ public RawCommandInfo getMonitorCommand(String jobID);
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName);
+
+ public String getScriptExtension();
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+ public OutputParser getParser();
+
+ public String getInstalledPath();
+
+ public String getBaseCancelCommand();
+
+ public String getBaseMonitorCommand();
+
+ public String getBaseSubmitCommand();
+}