You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/15 23:51:50 UTC
[1/2] airavata git commit: AutoScheduling gfac side changes
Repository: airavata
Updated Branches:
refs/heads/orchestratorTaskBreakdown 24b83d8f9 -> 10d593c95
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 946055e..52cd395 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,22 +21,24 @@
package org.apache.airavata.gfac.impl;
-import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
public class GFacWorker implements Runnable {
@@ -46,7 +48,7 @@ public class GFacWorker implements Runnable {
private String processId;
private String gatewayId;
private String tokenId;
- private boolean runOutflow = false;
+ private boolean continueTaskFlow = false;
/**
@@ -61,7 +63,7 @@ public class GFacWorker implements Runnable {
this.tokenId = processContext.getTokenId();
engine = Factory.getGFacEngine();
this.processContext = processContext;
- runOutflow = true;
+ continueTaskFlow = true;
}
/**
@@ -98,19 +100,15 @@ public class GFacWorker implements Runnable {
case CONFIGURING_WORKSPACE:
case INPUT_DATA_STAGING:
case EXECUTING:
- recoverProcess();
- break;
case MONITORING:
- if (runOutflow) {
- runProcessOutflow();
- } else {
- monitorProcess();
- }
- break;
- case OUTPUT_DATA_STAGING:
- case POST_PROCESSING:
- recoverProcessOutflow();
- break;
+ case OUTPUT_DATA_STAGING:
+ case POST_PROCESSING:
+ if (continueTaskFlow) {
+ continueTaskExecution();
+ } else {
+ recoverProcess();
+ }
+ break;
case COMPLETED:
completeProcess();
break;
@@ -160,56 +158,94 @@ public class GFacWorker implements Runnable {
Factory.getGfacContext().removeProcess(processContext.getProcessId());
}
- private void recoverProcessOutflow() throws GFacException {
- engine.recoverProcessOutflow(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- completeProcess();
- }
+ private void continueTaskExecution() throws GFacException {
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ return;
+ }
+ processContext.setPauseTaskExecution(false);
+ List<String> taskExecutionOrder = processContext.getTaskExecutionOrder();
+ String currentExecutingTaskId = processContext.getCurrentExecutingTaskId();
+ boolean found = false;
+ String nextTaskId = null;
+ for (String taskId : taskExecutionOrder) {
+ if (!found) {
+ if (taskId.equalsIgnoreCase(currentExecutingTaskId)) {
+ found = true;
+ }
+ continue;
+ } else {
+ nextTaskId = taskId;
+ break;
+ }
+ }
+ if (nextTaskId != null) {
+ engine.continueProcess(processContext, nextTaskId);
+ }
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ return;
+ }
- private void runProcessOutflow() throws GFacException {
- engine.runProcessOutflow(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- completeProcess();
- }
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
+ }
private void recoverProcess() throws GFacException {
- engine.recoverProcess(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- monitorProcess();
- }
+
+ String taskDag = processContext.getProcessModel().getTaskDag();
+ List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+ Map<String, TaskModel> taskMap = processContext.getTaskMap();
+ String recoverTaskId = null;
+ for (String taskId : taskExecutionOrder) {
+ TaskModel taskModel = taskMap.get(taskId);
+ TaskState state = taskModel.getTaskStatus().getState();
+ if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+ recoverTaskId = taskId;
+ break;
+ }
+ }
+
+ engine.recoverProcess(processContext, recoverTaskId);
+ if (processContext.isInterrupted()) {
+ return;
+ }
+
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
+ }
private void executeProcess() throws GFacException {
engine.executeProcess(processContext);
if (processContext.isInterrupted()) {
return;
}
- monitorProcess();
- }
- private void monitorProcess() throws GFacException {
- try {
- JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
- if (monitorService != null) {
- monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
- ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- } else {
- // we directly invoke outflow
- runProcessOutflow();
- }
- } catch (AiravataException e) {
- throw new GFacException("Error while retrieving moniot service", e);
- }
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
}
+// private void monitorProcess() throws GFacException {
+// try {
+// JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
+// if (monitorService != null) {
+// monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+// ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
+// status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+// processContext.setProcessStatus(status);
+// GFacUtils.saveAndPublishProcessStatus(processContext);
+// } else {
+// // we directly invoke outflow
+// continueTaskExecution();
+// }
+// } catch (AiravataException e) {
+// throw new GFacException("Error while retrieving moniot service", e);
+// }
+// }
+
private void sendAck() {
try {
long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 359d39c..c4d4676 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -32,10 +32,6 @@ import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
@@ -52,7 +48,6 @@ import javax.mail.Session;
import javax.mail.Store;
import javax.mail.search.FlagTerm;
import javax.mail.search.SearchTerm;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -124,6 +119,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
public void monitor(String jobId, ProcessContext processContext) {
log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
jobMonitorMap.put(jobId, processContext);
+ processContext.setPauseTaskExecution(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b892a00..4468a1c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -371,7 +371,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskModel.setTaskStatus(taskStatus);
taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
- submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+// submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
taskModel.setSubTaskModel(bytes);
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
[2/2] airavata git commit: AutoScheduling gfac side changes
Posted by sh...@apache.org.
AutoScheduling gfac side changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/10d593c9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/10d593c9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/10d593c9
Branch: refs/heads/orchestratorTaskBreakdown
Commit: 10d593c95260b1d3c2b4a578652be4e0dd4a7f95
Parents: 24b83d8
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Oct 15 17:51:38 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Oct 15 17:51:38 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacEngine.java | 6 +-
.../apache/airavata/gfac/core/GFacUtils.java | 260 +----
.../gfac/core/context/ProcessContext.java | 88 +-
.../airavata/gfac/core/context/TaskContext.java | 10 +
.../airavata/gfac/impl/GFacEngineImpl.java | 1085 ++++++++----------
.../apache/airavata/gfac/impl/GFacWorker.java | 144 ++-
.../gfac/monitor/email/EmailBasedMonitor.java | 6 +-
.../cpi/impl/SimpleOrchestratorImpl.java | 2 +-
8 files changed, 684 insertions(+), 917 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 176f74c..1d3ad30 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -29,11 +29,9 @@ public interface GFacEngine {
public void executeProcess(ProcessContext processContext) throws GFacException ;
- public void recoverProcess(ProcessContext processContext) throws GFacException ;
+ public void recoverProcess(ProcessContext processContext, String taskId) throws GFacException;
- public void runProcessOutflow(ProcessContext processContext) throws GFacException ;
-
- public void recoverProcessOutflow(ProcessContext processContext) throws GFacException ;
+ public void continueProcess(ProcessContext processContext, String taskId) throws GFacException ;
public void cancelProcess(ProcessContext processContext) throws GFacException ;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index e51e32b..871a9ab 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -21,11 +21,7 @@
package org.apache.airavata.gfac.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ZkConstants;
+import org.apache.airavata.common.utils.*;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -46,14 +42,12 @@ import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.*;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.ProcessOutput;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -86,7 +80,6 @@ public class GFacUtils {
private GFacUtils() {
}
-
/**
* Read data from inputStream and convert it to String.
*
@@ -117,21 +110,6 @@ public class GFacUtils {
}
}
-// /**
-// * This returns true if the give job is finished
-// * otherwise false
-// *
-// * @param job
-// * @return
-// */
-// public static boolean isJobFinished(JobDescriptor job) {
-// if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
-// return true;
-// } else {
-// return false;
-// }
-// }
-
/**
* This will read
*
@@ -158,17 +136,6 @@ public class GFacUtils {
}
}
-// public static boolean isSynchronousMode(
-// JobExecutionContext jobExecutionContext) {
-// GFacConfiguration gFacConfiguration = jobExecutionContext
-// .getGFacConfiguration();
-// if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
-// .getExecutionMode())) {
-// return false;
-// }
-// return true;
-// }
-
public static String readFileToString(String file)
throws FileNotFoundException, IOException {
BufferedReader instream = null;
@@ -315,225 +282,6 @@ public class GFacUtils {
}
}
-/* public static void saveExperimentStatus(ProcessContext processContext,
- ExperimentState state) throws GFacException {
- try {
- // first we save job jobModel to the registry for sa and then save the job status.
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- ExperimentStatus status = new ExperimentStatus();
- status.setState(state);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_STATUS, status, processContext.getProcessModel().getExperimentId());
- ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, processContext.getProcessModel().getExperimentId(), processContext.getGatewayId());
-
- processContext.getLocalEventPublisher().publish(experimentStatusChangeEvent);
- } catch (Exception e) {
- throw new GFacException("Error persisting experiment status"
- + e.getLocalizedMessage(), e);
- }
- }*/
-
-// public static void updateJobStatus(JobExecutionContext jobExecutionContext,
-// JobDetails details, JobState state) throws GFacException {
-// try {
-// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-// JobStatus status = new JobStatus();
-// status.setJobState(state);
-// status.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// details.setJobStatus(status);
-// experimentCatalog.update(
-// ExperimentCatalogModelType.JOB_DETAIL,
-// details, details.getJobID());
-// } catch (Exception e) {
-// throw new GFacException("Error persisting job status"
-// + e.getLocalizedMessage(), e);
-// }
-// }
-
-// public static void saveErrorDetails(
-// ProcessContext processContext, String errorMessage)
-// throws GFacException {
-// try {
-// ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-// ErrorModel details = new ErrorModel();
-// details.setActualErrorMessage(errorMessage);
-// details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-// // FIXME : Save error model according to new data model
-//// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
-//// jobExecutionContext.getTaskData().getTaskID());
-// } catch (Exception e) {
-// throw new GFacException("Error persisting job status"
-// + e.getLocalizedMessage(), e);
-// }
-// }
-
- public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
- Map<String, Object> map = new HashMap<String, Object>();
- for (InputDataObjectType objectType : experimentData) {
- map.put(objectType.getName(), objectType);
- }
- return map;
- }
-
- public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException {
- Map<String, Object> map = new HashMap<String, Object>();
- for (OutputDataObjectType objectType : experimentData) {
- map.put(objectType.getName(), objectType);
- }
- return map;
- }
-
-// public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
-// JobExecutionContext jobExecutionContext)
-// throws Exception {
-// String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
-// .getExperimentID());
-// if (expState == null || expState.isEmpty()) {
-// return GfacExperimentState.UNKNOWN;
-// }
-// return GfacExperimentState.findByValue(Integer.valueOf(expState));
-// }
-//
-// public static boolean createHandlerZnode(CuratorFramework curatorClient,
-// JobExecutionContext jobExecutionContext, String className)
-// throws Exception {
-// String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-// jobExecutionContext.getExperimentID(), className);
-// Stat exists = curatorClient.checkExists().forPath(expState);
-// if (exists == null) {
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-// } else {
-// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists == null) {
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-// }
-// }
-//
-// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists != null) {
-// curatorClient.setData().withVersion(exists.getVersion())
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-// String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
-// }
-// return true;
-// }
-
-// public static boolean createHandlerZnode(CuratorFramework curatorClient,
-// JobExecutionContext jobExecutionContext, String className,
-// GfacHandlerState state) throws Exception {
-// String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-// jobExecutionContext.getExperimentID(), className);
-// Stat exists = curatorClient.checkExists().forPath(expState);
-// if (exists == null) {
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(expState, new byte[0]);
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-// } else {
-// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists == null) {
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-// }
-// }
-//
-// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists != null) {
-// curatorClient.setData().withVersion(exists.getVersion())
-// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-// String.valueOf(state.getValue()).getBytes());
-// }
-// return true;
-// }
-
-// public static boolean updateHandlerState(CuratorFramework curatorClient,
-// JobExecutionContext jobExecutionContext, String className,
-// GfacHandlerState state) throws Exception {
-// String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
-// jobExecutionContext.getExperimentID(), className);
-// Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists != null) {
-// curatorClient.setData().withVersion(exists.getVersion())
-// .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
-// } else {
-// createHandlerZnode(curatorClient, jobExecutionContext, className, state);
-// }
-// return false;
-// }
-
-// public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
-// JobExecutionContext jobExecutionContext, String className) {
-// try {
-// String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
-// Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-// if (exists != null) {
-// String stateVal = new String(curatorClient.getData().storingStatIn(exists)
-// .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
-// return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
-// }
-// return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
-// // return false
-// } catch (Exception e) {
-// log.error("Error occured while getting zk node status", e);
-// return null;
-// }
-// }
-
-// // This method is dangerous because of moving the experiment data
-// public static boolean createExperimentEntryForPassive(String experimentID,
-// String taskID, CuratorFramework curatorClient, String experimentNode,
-// String pickedChild, String tokenId, long deliveryTag) throws Exception {
-// String experimentPath = experimentNode + File.separator + pickedChild;
-// String newExperimentPath = experimentPath + File.separator + experimentID;
-// Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
-// String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
-// if (oldExperimentPath == null) { // this means this is a very new experiment
-// // are going to create a new node
-// log.info("This is a new Job, so creating all the experiment docs from the scratch");
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
-// String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(newExperimentPath + File.separator + "state",
-// String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
-//
-// if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
-// log.info("Created the node: " + stateNodePath + " successfully !");
-// }else {
-// log.error("Error creating node: " + stateNodePath + " successfully !");
-// }
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
-// } else {
-// log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
-// removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
-// if(newExperimentPath.equals(oldExperimentPath)){
-// log.info("Re-launch experiment came to the same GFac instance");
-// }else {
-// log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
-// curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
-// copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
-// String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
-// Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
-// if(exists!=null) {
-// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-// .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
-// curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
-// ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
-// }
-// // After all the files are successfully transfered we delete the // old experiment,otherwise we do
-// // not delete a single file
-// log.info("After a successful copying of experiment data for an old experiment we delete the old data");
-// log.info("Deleting experiment data: " + oldExperimentPath);
-// ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
-// }
-// }
-// return true;
-// }
-
private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (exists != null) {
@@ -1202,4 +950,10 @@ public class GFacUtils {
}
return jobModel;
}
+
+ public static List<String> parseTaskDag(String taskDag) {
+ // TODO - parse taskDag and create taskId list
+ List<String> taskIds = new ArrayList<>();
+ return taskIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index c2f4d83..47a7430 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -38,12 +38,14 @@ import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -63,7 +65,6 @@ public class ProcessContext {
private String inputDir;
private String outputDir;
private String localWorkingDir;
- private List<TaskContext> taskChain;
private GatewayResourceProfile gatewayResourceProfile;
private ComputeResourceDescription computeResourceDescription;
private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -81,6 +82,13 @@ public class ProcessContext {
private boolean handOver;
private boolean cancel;
private ServerInfo serverInfo;
+ private List<String> taskExecutionOrder;
+ private List<TaskModel> taskList;
+ private Map<String, TaskModel> taskMap;
+ private String currentExecutingTaskId; // TaskId of current executing task.
+ private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
+ private boolean complete = false; // all tasks executed?
+ private boolean recovery = false; // is process in recovery mode?
/**
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -162,14 +170,6 @@ public class ProcessContext {
this.workingDir = workingDir;
}
- public List<TaskContext> getTaskChain() {
- return taskChain;
- }
-
- public void setTaskChain(List<TaskContext> taskChain) {
- this.taskChain = taskChain;
- }
-
public GatewayResourceProfile getGatewayResourceProfile() {
return gatewayResourceProfile;
}
@@ -279,6 +279,44 @@ public class ProcessContext {
this.dataMovementProtocol = dataMovementProtocol;
}
+ public String getTaskDag() {
+ return getProcessModel().getTaskDag();
+ }
+
+ public List<TaskModel> getTaskList() {
+ if (taskList == null) {
+ synchronized (TaskModel.class){
+ if (taskList == null) {
+ taskList = getProcessModel().getTasks();
+ }
+ }
+ }
+ return taskList;
+ }
+
+
+ public List<String> getTaskExecutionOrder() {
+ return taskExecutionOrder;
+ }
+
+ public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+ this.taskExecutionOrder = taskExecutionOrder;
+ }
+
+ public Map<String, TaskModel> getTaskMap() {
+ if (taskMap == null) {
+ synchronized (TaskModel.class) {
+ if (taskMap == null) {
+ taskMap = new HashMap<>();
+ for (TaskModel taskModel : getTaskList()) {
+ taskMap.put(taskModel.getTaskId(), taskModel);
+ }
+ }
+ }
+ }
+ return taskMap;
+ }
+
public JobModel getJobModel() {
if (jobModel == null) {
jobModel = new JobModel();
@@ -376,4 +414,36 @@ public class ProcessContext {
public ServerInfo getServerInfo() {
return serverInfo;
}
+
+ public String getCurrentExecutingTaskId() {
+ return currentExecutingTaskId;
+ }
+
+ public void setCurrentExecutingTaskId(String currentExecutingTaskId) {
+ this.currentExecutingTaskId = currentExecutingTaskId;
+ }
+
+ public boolean isPauseTaskExecution() {
+ return pauseTaskExecution;
+ }
+
+ public void setPauseTaskExecution(boolean pauseTaskExecution) {
+ this.pauseTaskExecution = pauseTaskExecution;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public void setComplete(boolean complete) {
+ this.complete = complete;
+ }
+
+ public boolean isRecovery() {
+ return recovery;
+ }
+
+ public void setRecovery(boolean recovery) {
+ this.recovery = recovery;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index f1400e3..ae92ba1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,12 +20,14 @@
*/
package org.apache.airavata.gfac.core.context;
+import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ public class TaskContext {
private ProcessContext parentProcessContext;
private InputDataObjectType processInput;
private OutputDataObjectType processOutput;
+ private Object subTaskModel = null;
public TaskModel getTaskModel() {
return taskModel;
@@ -107,4 +110,11 @@ public class TaskContext {
public String getExperimentId() {
return parentProcessContext.getExperimentId();
}
+
+ public Object getSubTaskModel() throws TException {
+ if (subTaskModel == null) {
+ subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
+ }
+ return subTaskModel;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 834399e..07a03c1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,7 +25,6 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -38,11 +37,7 @@ import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
@@ -56,9 +51,7 @@ import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.model.task.TaskModel;
-import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.model.task.*;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.registry.cpi.ExpCatChildDataType;
@@ -69,44 +62,40 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
public class GFacEngineImpl implements GFacEngine {
- private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
-
- public GFacEngineImpl() throws GFacException {
-
- }
-
- @Override
- public ProcessContext populateProcessContext(String processId, String gatewayId, String
- tokenId) throws GFacException {
- try {
- ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
- AppCatalog appCatalog = Factory.getDefaultAppCatalog();
- processContext.setAppCatalog(appCatalog);
- ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
- processContext.setExperimentCatalog(expCatalog);
- processContext.setCuratorClient(Factory.getCuratorClient());
- processContext.setStatusPublisher(Factory.getStatusPublisher());
-
- ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
- processContext.setProcessModel(processModel);
- GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
- processContext.setGatewayResourceProfile(gatewayProfile);
- processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
- (gatewayId, processModel.getComputeResourceId()));
- processContext.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
- (processContext.getComputeResourcePreference().getComputeResourceId()));
- processContext.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
- .getApplicationDeployement(processModel.getApplicationDeploymentId()));
+ private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
+
+ public GFacEngineImpl() throws GFacException {
+
+ }
+
+ @Override
+ public ProcessContext populateProcessContext(String processId, String gatewayId, String
+ tokenId) throws GFacException {
+ try {
+ ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+ AppCatalog appCatalog = Factory.getDefaultAppCatalog();
+ processContext.setAppCatalog(appCatalog);
+ ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
+ processContext.setExperimentCatalog(expCatalog);
+ processContext.setCuratorClient(Factory.getCuratorClient());
+ processContext.setStatusPublisher(Factory.getStatusPublisher());
+
+ ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+ processContext.setProcessModel(processModel);
+ GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
+ processContext.setGatewayResourceProfile(gatewayProfile);
+ processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
+ (gatewayId, processModel.getComputeResourceId()));
+ processContext.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
+ (processContext.getComputeResourcePreference().getComputeResourceId()));
+ processContext.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
+ .getApplicationDeployement(processModel.getApplicationDeploymentId()));
ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface()
.getApplicationInterface(processModel.getApplicationInterfaceId());
processContext.setApplicationInterfaceDescription(applicationInterface);
@@ -115,22 +104,22 @@ public class GFacEngineImpl implements GFacEngine {
ServerInfo serverInfo = new ServerInfo(processContext.getComputeResourcePreference().getLoginUserName(), hostName);
processContext.setServerInfo(serverInfo);
List<OutputDataObjectType> applicationOutputs = applicationInterface.getApplicationOutputs();
- if (applicationOutputs != null && !applicationOutputs.isEmpty()){
- for (OutputDataObjectType outputDataObjectType : applicationOutputs){
- if (outputDataObjectType.getType().equals(DataType.STDOUT)){
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")){
- outputDataObjectType.setValue(applicationInterface.getApplicationName()+ ".stdout");
- processContext.setStdoutLocation(applicationInterface.getApplicationName()+ ".stdout");
- }else {
+ if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
+ for (OutputDataObjectType outputDataObjectType : applicationOutputs) {
+ if (outputDataObjectType.getType().equals(DataType.STDOUT)) {
+ if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
+ outputDataObjectType.setValue(applicationInterface.getApplicationName() + ".stdout");
+ processContext.setStdoutLocation(applicationInterface.getApplicationName() + ".stdout");
+ } else {
processContext.setStdoutLocation(outputDataObjectType.getValue());
}
}
- if (outputDataObjectType.getType().equals(DataType.STDERR)){
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")){
+ if (outputDataObjectType.getType().equals(DataType.STDERR)) {
+ if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
String stderrLocation = applicationInterface.getApplicationName() + ".stderr";
outputDataObjectType.setValue(stderrLocation);
processContext.setStderrLocation(stderrLocation);
- }else {
+ } else {
processContext.setStderrLocation(outputDataObjectType.getValue());
}
}
@@ -139,479 +128,400 @@ public class GFacEngineImpl implements GFacEngine {
expCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
processModel.setProcessOutputs(applicationOutputs);
processContext.setResourceJobManager(getResourceJobManager(processContext));
- processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
-
- String inputPath = ServerSettings.getLocalDataLocation();
- if (inputPath != null) {
- processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") +
- processContext.getProcessId());
- }
-
- List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
- if (jobModels != null && !jobModels.isEmpty()) {
- if (jobModels.size() > 1) {
- log.warn("Process has more than one job model, take first one");
- }
- processContext.setJobModel(((JobModel) jobModels.get(0)));
- }
- return processContext;
- } catch (AppCatalogException e) {
- throw new GFacException("App catalog access exception ", e);
- } catch (RegistryException e) {
- throw new GFacException("Registry access exception", e);
- } catch (AiravataException e) {
- throw new GFacException("Remote cluster initialization error", e);
- }
- }
-
- @Override
- public void executeProcess(ProcessContext processContext) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
-// List<TaskContext> taskChain = new ArrayList<>();
- if (configureWorkspace(processContext, false)) return;
-
- // exit if process is handed over to another instance while input staging
- if (inputDataStaging(processContext, false)) return;
-
- // exit if process is handed orver to another instance while job submission.
- if (executeJobSubmission(processContext)) return;
-// processContext.setTaskChain(taskChain);
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
- }
-
- private boolean executeJobSubmission(ProcessContext processContext) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- TaskContext taskCtx;
- TaskStatus taskStatus;
- ProcessStatus status = new ProcessStatus(ProcessState.EXECUTING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- GFacUtils.saveAndPublishProcessStatus(processContext);
- taskCtx = getJobSubmissionTaskContext(processContext);
- saveTaskModel(taskCtx);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- taskStatus = executeTask(taskCtx, jobSubmissionTask, false);
- if (taskStatus.getState() == TaskState.FAILED) {
- log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
- "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
- .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), jobSubmissionTask.getType
- ().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("Job submission task failed");
- errorModel.setActualErrorMessage(errorMsg);
- GFacUtils.saveTaskError(taskCtx, errorModel);
- ProcessStatus processStatus = processContext.getProcessStatus();
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processStatus.setReason(errorMsg);
- processStatus.setState(ProcessState.FAILED);
- processContext.setProcessStatus(processStatus);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- throw new GFacException("Job submission task failed");
- }
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- return false;
- }
-
- private boolean inputDataStaging(ProcessContext processContext, boolean recover) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- TaskContext taskCtx;
- TaskStatus taskStatus;// execute process inputs
- ProcessStatus status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
- sortByInputOrder(processInputs);
- if (processInputs != null) {
- for (InputDataObjectType processInput : processInputs) {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- DataType type = processInput.getType();
- switch (type) {
- case STDERR:
- break;
- case STDOUT:
- break;
- case URI:
- try {
- taskCtx = getDataStagingTaskContext(processContext, processInput);
- } catch (TException | TaskException e) {
- throw new GFacException("Error while serializing data staging sub task model");
- }
- saveTaskModel(taskCtx);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
- taskStatus = executeTask(taskCtx, dMoveTask, false);
- if (taskStatus.getState() == TaskState.FAILED) {
- log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
- "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
- .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
- ().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("Error while staging input data");
- errorModel.setActualErrorMessage(errorMsg);
- GFacUtils.saveTaskError(taskCtx, errorModel);
- ProcessStatus processStatus = processContext.getProcessStatus();
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processStatus.setReason(errorMsg);
- processStatus.setState(ProcessState.FAILED);
- processContext.setProcessStatus(processStatus);
+ processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
+
+ String inputPath = ServerSettings.getLocalDataLocation();
+ if (inputPath != null) {
+ processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") +
+ processContext.getProcessId());
+ }
+
+ List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
+ if (jobModels != null && !jobModels.isEmpty()) {
+ if (jobModels.size() > 1) {
+ log.warn("Process has more than one job model, take first one");
+ }
+ processContext.setJobModel(((JobModel) jobModels.get(0)));
+ }
+ return processContext;
+ } catch (AppCatalogException e) {
+ throw new GFacException("App catalog access exception ", e);
+ } catch (RegistryException e) {
+ throw new GFacException("Registry access exception", e);
+ } catch (AiravataException e) {
+ throw new GFacException("Remote cluster initialization error", e);
+ }
+ }
+
+ @Override
+ public void executeProcess(ProcessContext processContext) throws GFacException {
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ String taskDag = processContext.getTaskDag();
+ List<String> taskIds = GFacUtils.parseTaskDag(taskDag);
+ processContext.setTaskExecutionOrder(taskIds);
+ executeTaskListFrom(processContext, taskIds.get(0));
+ }
+
+ private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException {
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ List<TaskModel> taskList = processContext.getTaskList();
+ Map<String, TaskModel> taskMap = processContext.getTaskMap();
+ boolean fastForward = true;
+ for (String taskId : processContext.getTaskExecutionOrder()) {
+ if (fastForward) {
+ if (taskId.equalsIgnoreCase(startingTaskId)) {
+ fastForward = false;
+ } else {
+ continue;
+ }
+ }
+
+ TaskModel taskModel = taskMap.get(taskId);
+ TaskTypes taskType = taskModel.getTaskType();
+ TaskContext taskContext = getTaskContext(processContext);
+ taskContext.setTaskModel(taskModel);
+ ProcessStatus status = null;
+ processContext.setCurrentExecutingTaskId(taskId);
+ switch (taskType) {
+ case ENV_SETUP:
+ status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ configureWorkspace(taskContext, processContext.isRecovery());
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ break;
+ case DATA_STAGING:
+ try {
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ DataStagingTaskModel subTaskModel = (DataStagingTaskModel) taskContext.getSubTaskModel();
+ if (subTaskModel.getType() == DataStageType.INPUT) {
+ status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
GFacUtils.saveAndPublishProcessStatus(processContext);
- throw new GFacException("Error while staging input data");
- }
- break;
- default:
- // nothing to do
- break;
- }
- }
- }
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- return false;
- }
-
- private boolean configureWorkspace(ProcessContext processContext, boolean recover) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- TaskContext taskCtx;
- ProcessStatus status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- // Run all environment setup tasks
- taskCtx = getEnvSetupTaskContext(processContext);
- saveTaskModel(taskCtx);
- SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
- TaskStatus taskStatus = executeTask(taskCtx, envSetupTask, recover);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
+ inputDataStaging(taskContext, processContext.isRecovery());
+ } else {
+ status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ outputDataStaging(taskContext, processContext.isRecovery());
+ }
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ } catch (TException e) {
+ throw new GFacException(e);
+ }
+ break;
+
+ case JOB_SUBMISSION:
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ status = new ProcessStatus(ProcessState.EXECUTING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ executeJobSubmission(taskContext, processContext.isRecovery());
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ break;
+
+ case MONITORING:
+ JobMonitor monitorService = null;
+ try {
+ MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
+ status = new ProcessStatus(ProcessState.MONITORING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
+ monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+ } catch (AiravataException | TException e) {
+ throw new GFacException(e);
+ }
+ break;
+
+ case ENV_CLEANUP:
+ // TODO implement environment clean up task logic
+ break;
+
+ default:
+ throw new GFacException("Unsupported Task type");
+
+ }
+
+ if (processContext.isPauseTaskExecution()) {
+ return; // If any task put processContext to wait, the same task should continue processContext execution.
+ }
+
+ }
+ processContext.setComplete(true);
+ }
+
+ private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+ TaskStatus taskStatus;
+ try {
+ JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
+ JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol());
+
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ taskStatus = executeTask(taskContext, jobSubmissionTask, recovery);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
+ "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+ .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
+ ().name(), taskStatus.getReason());
+ String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
+ "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+ taskContext.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage("Job submission task failed");
+ errorModel.setActualErrorMessage(errorMsg);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ throw new GFacException("Job submission task failed");
+ }
+ return false;
+ } catch (TException e) {
+ throw new GFacException(e);
+ }
+ }
+
+ private boolean configureWorkspace(TaskContext taskContext, boolean recover) throws GFacException {
+
+ try {
+ EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel();
+ Task envSetupTask = null;
+ if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) {
+ envSetupTask = new SSHEnvironmentSetupTask();
+ } else {
+ throw new GFacException("Unsupported security protocol, Airavata doesn't support " +
+ subTaskModel.getProtocol().name() + " protocol yet.");
+ }
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(status);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ TaskStatus taskStatus = executeTask(taskContext, envSetupTask, recover);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
+ "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+ .getParentProcessContext().getProcessId(), taskContext.getTaskId(), envSetupTask.getType
+ ().name(), taskStatus.getReason());
+ String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
+ "reason:" + " {}" + taskContext.getExperimentId() + taskContext.getProcessId() + taskContext.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage("Error while environment setup");
+ errorModel.setActualErrorMessage(errorMsg);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ throw new GFacException("Error while environment setup");
+ }
+ } catch (TException e) {
+ throw new GFacException("Couldn't get environment setup task model", e);
+ }
+ return false;
+ }
+
+ private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException {
+ TaskStatus taskStatus;// execute process inputs
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ taskStatus = executeTask(taskContext, dMoveTask, false);
if (taskStatus.getState() == TaskState.FAILED) {
- log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
- "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
- .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType
- ().name(), taskStatus.getReason());
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+ "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+ .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
+ ().name(), taskStatus.getReason());
String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+ "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+ taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("Error while environment setup");
+ errorModel.setUserFriendlyMessage("Error while staging input data");
errorModel.setActualErrorMessage(errorMsg);
- GFacUtils.saveTaskError(taskCtx, errorModel);
- ProcessStatus processStatus = processContext.getProcessStatus();
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processStatus.setReason(errorMsg);
- processContext.setProcessStatus(processStatus);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- throw new GFacException("Error while environment setup");
- }
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- return false;
- }
-
-
- @Override
- public void recoverProcess(ProcessContext processContext) throws GFacException {
- ProcessState state = processContext.getProcessStatus().getState();
- switch (state) {
- case CREATED:
- case VALIDATED:
- case STARTED:
- executeProcess(processContext);
- break;
- case PRE_PROCESSING:
- case CONFIGURING_WORKSPACE:
- if (configureWorkspace(processContext, true)) return;
- if (inputDataStaging(processContext, false)) return;
- if (executeJobSubmission(processContext)) return;
- break;
- case INPUT_DATA_STAGING:
- if (inputDataStaging(processContext, true)) return;
- if (executeJobSubmission(processContext)) return;
- break;
- case EXECUTING:
- JobModel jobModel = getJobModel(processContext);
- if (jobModel == null || jobModel.getJobId() == null) {
- if (executeJobSubmission(processContext)) return;
- } else {
- processContext.setJobModel(jobModel);
- return;
- }
- break;
- default:
- throw new GFacException("Invalid process recovery invocation");
- }
- }
-
- private JobModel getJobModel(ProcessContext processContext) {
- try {
- return GFacUtils.getJobModel(processContext);
- } catch (RegistryException e) {
- log.error("Error while retrieving jobId,", e);
- return null;
- }
- }
-
- @Override
- public void runProcessOutflow(ProcessContext processContext) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
- // exit if process is handed over to another instance while output staging.
- if (outputDataStaging(processContext, false)) return;
-
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
-
- postProcessing(processContext,false);
-
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- }
- }
-
- /**
- *
- * @param processContext
- * @param recovery
- * @return <code>true</code> if you need to interrupt processing <code>false</code> otherwise.
- * @throws GFacException
- */
- private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException {
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ throw new GFacException("Error while staging input data");
+ }
+ return false;
+ }
+
+
+ @Override
+ public void recoverProcess(ProcessContext processContext, String recoverTaskId) throws GFacException {
+ processContext.setRecovery(true);
+ continueProcess(processContext, recoverTaskId);
+ }
+
+ private JobModel getJobModel(ProcessContext processContext) {
+ try {
+ return GFacUtils.getJobModel(processContext);
+ } catch (RegistryException e) {
+ log.error("Error while retrieving jobId,", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void continueProcess(ProcessContext processContext, String taskId) throws GFacException {
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return;
+ }
+ executeTaskListFrom(processContext, taskId);
+ }
+
+ /**
+ * @param processContext
+ * @param recovery
+ * @return <code>true</code> if you need to interrupt processing <code>false</code> otherwise.
+ * @throws GFacException
+ */
+ private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException {
ProcessStatus status = new ProcessStatus(ProcessState.POST_PROCESSING);
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
// taskCtx = getEnvCleanupTaskContext(processContext);
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- return false;
- }
-
- /**
- *
- * @param processContext
- * @param recovery
- * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
- * @throws GFacException
- */
- private boolean outputDataStaging(ProcessContext processContext, boolean recovery) throws GFacException {
- TaskContext taskCtx;
- ProcessStatus status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- File localWorkingdir = new File(processContext.getLocalWorkingDir());
- localWorkingdir.mkdirs(); // make local dir if not exist
- List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
- for (OutputDataObjectType processOutput : processOutputs) {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return true;
- }
- DataType type = processOutput.getType();
- switch (type) {
- case URI: case STDERR: case STDOUT:
- try {
- taskCtx = getDataStagingTaskContext(processContext, processOutput);
- } catch (TException | TaskException e) {
- throw new GFacException("Thrift model to byte[] conversion issue", e);
- }
- saveTaskModel(taskCtx);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
- TaskStatus taskStatus = executeTask(taskCtx, dMoveTask, recovery);
- if (taskStatus.getState() == TaskState.FAILED) {
- log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
- "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
- .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
- ().name(), taskStatus.getReason());
-
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("Error while staging output data");
- errorModel.setActualErrorMessage(errorMsg);
- GFacUtils.saveTaskError(taskCtx, errorModel);
- ProcessStatus processStatus = processContext.getProcessStatus();
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processStatus.setReason(errorMsg);
- processStatus.setState(ProcessState.FAILED);
- processContext.setProcessStatus(processStatus);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- throw new GFacException("Error while staging output data");
- }
- break;
- default:
- // nothing to do
- break;
- }
- }
- return false;
- }
-
- @Override
- public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
- ProcessState processState = processContext.getProcessStatus().getState();
- switch (processState) {
- case OUTPUT_DATA_STAGING:
- if (outputDataStaging(processContext, true)) return;
- if (postProcessing(processContext, false)) return;
- case POST_PROCESSING:
- postProcessing(processContext, true);
- break;
- }
- runProcessOutflow(processContext); // TODO implement recover steps
- }
-
- @Override
- public void cancelProcess(ProcessContext processContext) throws GFacException {
- if (processContext.getProcessState() == ProcessState.MONITORING) {
- // get job submission task and invoke cancel
- JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
- TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
- executeCancel(taskCtx, jobSubmissionTask);
- }
- }
-
- private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException {
- TaskStatus status = new TaskStatus(TaskState.EXECUTING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskCtx.setTaskStatus(status);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- TaskStatus taskStatus = null;
- if (recover) {
- taskStatus = task.recover(taskCtx);
- } else {
- taskStatus = task.execute(taskCtx);
- }
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskCtx.setTaskStatus(taskStatus);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- return taskCtx.getTaskStatus();
- }
-
- private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
- try {
- JobStatus oldJobStatus = jSTask.cancel(taskContext);
-
- if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
- JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
- monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
- JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
- newJobStatus.setReason("Job cancelled");
+ if (processContext.isInterrupted()) {
+ GFacUtils.handleProcessInterrupt(processContext);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param taskContext
+ * @param recovery
+ * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
+ * @throws GFacException
+ */
+ private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ TaskStatus taskStatus = executeTask(taskContext, dMoveTask, recovery);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
+ "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+ .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
+ ().name(), taskStatus.getReason());
+
+ String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
+ "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+ taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage("Error while staging output data");
+ errorModel.setActualErrorMessage(errorMsg);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ }
+ return false;
+ }
+
+ @Override
+ public void cancelProcess(ProcessContext processContext) throws GFacException {
+ if (processContext.getProcessState() == ProcessState.MONITORING) {
+ // get job submission task and invoke cancel
+ JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
+ TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
+ executeCancel(taskCtx, jobSubmissionTask);
+ }
+ }
+
+ private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException {
+ TaskStatus taskStatus = null;
+ if (recover) {
+ taskStatus = task.recover(taskCtx);
+ } else {
+ taskStatus = task.execute(taskCtx);
+ }
+ return taskStatus;
+ }
+
+ private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
+ try {
+ JobStatus oldJobStatus = jSTask.cancel(taskContext);
+
+ if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
+ JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
+ monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
+ JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
+ newJobStatus.setReason("Job cancelled");
newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
- .getJobModel());
- }
- } catch (TaskException e) {
- throw new GFacException("Error while cancelling job");
- } catch (AiravataException e) {
- throw new GFacException("Error wile getting monitoring service");
- }
- }
-
- private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
- TaskContext taskCtx = new TaskContext();
- taskCtx.setParentProcessContext(processContext);
-
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processContext.getProcessId());
- taskModel.setCreationTime(new Date().getTime());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
- TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setTaskStatus(taskStatus);
- taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
- taskCtx.setTaskModel(taskModel);
- return taskCtx;
- }
+ taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
+ .getJobModel());
+ }
+ } catch (TaskException e) {
+ throw new GFacException("Error while cancelling job");
+ } catch (AiravataException e) {
+ throw new GFacException("Error wile getting monitoring service");
+ }
+ }
- private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
- throws TException, TaskException {
- TaskContext taskCtx = new TaskContext();
- taskCtx.setParentProcessContext(processContext);
- // create new task model for this task
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processContext.getProcessId());
- taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setCreationTime(new Date().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatus(taskStatus);
- taskModel.setTaskType(TaskTypes.DATA_STAGING);
- // create data staging sub task model
- DataStagingTaskModel submodel = new DataStagingTaskModel();
- submodel.setSource(processInput.getValue());
- ServerInfo serverInfo = processContext.getServerInfo();
- URI destination = null;
- try {
- destination = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getUserName(),
- serverInfo.getHost(), serverInfo.getPort(), processContext.getWorkingDir(), null, null);
- } catch (URISyntaxException e) {
- throw new TaskException("Error while constructing destination file URI");
- }
- submodel.setDestination(destination.toString());
- taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
- taskCtx.setTaskModel(taskModel);
- taskCtx.setProcessInput(processInput);
- return taskCtx;
- }
-
- private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
+ taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+ taskCtx.setTaskModel(taskModel);
+ return taskCtx;
+ }
+
+ private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
throws TException, TaskException {
- TaskContext taskCtx = new TaskContext();
- taskCtx.setParentProcessContext(processContext);
- // create new task model for this task
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processContext.getProcessId());
- taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatus(taskStatus);
- taskModel.setTaskType(TaskTypes.DATA_STAGING);
- // create data staging sub task model
- String remoteOutputDir = processContext.getOutputDir();
- remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
- DataStagingTaskModel submodel = new DataStagingTaskModel();
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ // create data staging sub task model
+ String remoteOutputDir = processContext.getOutputDir();
+ remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
ServerInfo serverInfo = processContext.getServerInfo();
URI source = null;
try {
@@ -623,89 +533,82 @@ public class GFacEngineImpl implements GFacEngine {
submodel.setSource(source.toString());
// We don't know destination location at this time, data staging task will set this.
// because destination is required field we set dummy destination
- submodel.setDestination("dummy://temp/file/location");
- taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
- taskCtx.setTaskModel(taskModel);
+ submodel.setDestination("dummy://temp/file/location");
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ taskCtx.setTaskModel(taskModel);
taskCtx.setProcessOutput(processOutput);
- return taskCtx;
- }
-
- /**
- * Persist task model
- */
- private void saveTaskModel(TaskContext taskContext) throws GFacException {
- try {
- TaskModel taskModel = taskContext.getTaskModel();
- taskContext.getParentProcessContext().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
- taskModel.getParentProcessId());
- } catch (RegistryException e) {
- throw new GFacException("Error while saving task model", e);
- }
- }
-
- private TaskContext getEnvSetupTaskContext(ProcessContext processContext) {
- TaskContext taskCtx = new TaskContext();
- taskCtx.setParentProcessContext(processContext);
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processContext.getProcessId());
- taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
- taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
- taskModel.setTaskType(TaskTypes.ENV_SETUP);
- taskCtx.setTaskModel(taskModel);
- return taskCtx;
- }
-
-
- /**
- * Sort input data type by input order.
- */
- private void sortByInputOrder(List<InputDataObjectType> processInputs) {
- Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
- @Override
- public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
- return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
- }
- });
- }
-
- public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException, GFacException {
- List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
- .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
-
- ResourceJobManager resourceJobManager = null;
- JobSubmissionInterface jsInterface = null;
- for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
- if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) {
- jsInterface = jobSubmissionInterface;
- break;
- }
- }
- if (jsInterface == null) {
- throw new GFacException("Job Submission interface cannot be empty at this point");
- } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
- (jsInterface.getJobSubmissionInterfaceId());
- processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
- // context method.
- resourceJobManager = sshJobSubmission.getResourceJobManager();
- } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
- LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
- (jsInterface.getJobSubmissionInterfaceId());
- resourceJobManager = localSubmission.getResourceJobManager();
- } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) {
- SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
- (jsInterface.getJobSubmissionInterfaceId());
- processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
- resourceJobManager = sshJobSubmission.getResourceJobManager();
- } else {
- throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
- .name());
- }
-
- if (resourceJobManager == null) {
- throw new GFacException("Resource Job Manager is empty.");
- }
- return resourceJobManager;
- }
+ return taskCtx;
+ }
+
+ /**
+ * Persist task model
+ */
+ private void saveTaskModel(TaskContext taskContext) throws GFacException {
+ try {
+ TaskModel taskModel = taskContext.getTaskModel();
+ taskContext.getParentProcessContext().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
+ taskModel.getParentProcessId());
+ } catch (RegistryException e) {
+ throw new GFacException("Error while saving task model", e);
+ }
+ }
+
+ private TaskContext getTaskContext(ProcessContext processContext) {
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+ return taskCtx;
+ }
+
+
+ /**
+ * Sort input data type by input order.
+ */
+ private void sortByInputOrder(List<InputDataObjectType> processInputs) {
+ Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
+ return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
+ }
+ });
+ }
+
+ public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException, GFacException {
+ List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
+ .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
+
+ ResourceJobManager resourceJobManager = null;
+ JobSubmissionInterface jsInterface = null;
+ for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
+ if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) {
+ jsInterface = jobSubmissionInterface;
+ break;
+ }
+ }
+ if (jsInterface == null) {
+ throw new GFacException("Job Submission interface cannot be empty at this point");
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+ // context method.
+ resourceJobManager = sshJobSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ resourceJobManager = localSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) {
+ SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+ resourceJobManager = sshJobSubmission.getResourceJobManager();
+ } else {
+ throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
+ .name());
+ }
+
+ if (resourceJobManager == null) {
+ throw new GFacException("Resource Job Manager is empty.");
+ }
+ return resourceJobManager;
+ }
}