You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:48:07 UTC
[42/51] [abbrv] airavata git commit: Completed process recovery
Completed process recovery
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b57a5015
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b57a5015
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b57a5015
Branch: refs/heads/master
Commit: b57a5015d047ac032636772cbb8f7dd9184035cd
Parents: 72fb57b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Oct 23 14:22:46 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Oct 23 14:22:46 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacEngine.java | 2 +-
.../gfac/core/context/ProcessContext.java | 19 ++++++---
.../airavata/gfac/core/monitor/JobMonitor.java | 6 +--
.../airavata/gfac/impl/GFacEngineImpl.java | 30 +++++++++++---
.../apache/airavata/gfac/impl/GFacWorker.java | 2 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 43 ++++++++++++--------
6 files changed, 67 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 1d3ad30..78a6278 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -29,7 +29,7 @@ public interface GFacEngine {
public void executeProcess(ProcessContext processContext) throws GFacException ;
- public void recoverProcess(ProcessContext processContext, String taskId) throws GFacException;
+ public void recoverProcess(ProcessContext processContext) throws GFacException;
public void continueProcess(ProcessContext processContext, String taskId) throws GFacException ;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 47a7430..1a3a236 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -85,10 +85,10 @@ public class ProcessContext {
private List<String> taskExecutionOrder;
private List<TaskModel> taskList;
private Map<String, TaskModel> taskMap;
- private String currentExecutingTaskId; // TaskId of current executing task.
private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
private boolean complete = false; // all tasks executed?
private boolean recovery = false; // is process in recovery mode?
+ private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
/**
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -416,11 +416,10 @@ public class ProcessContext {
}
public String getCurrentExecutingTaskId() {
- return currentExecutingTaskId;
- }
-
- public void setCurrentExecutingTaskId(String currentExecutingTaskId) {
- this.currentExecutingTaskId = currentExecutingTaskId;
+ if (currentExecutingTaskModel != null) {
+ return currentExecutingTaskModel.getTaskId();
+ }
+ return null;
}
public boolean isPauseTaskExecution() {
@@ -446,4 +445,12 @@ public class ProcessContext {
public void setRecovery(boolean recovery) {
this.recovery = recovery;
}
+
+ public TaskModel getCurrentExecutingTaskModel() {
+ return currentExecutingTaskModel;
+ }
+
+ public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+ this.currentExecutingTaskModel = currentExecutingTaskModel;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index a3f62cf..4b2ecb2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -20,16 +20,16 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
public interface JobMonitor {
/**
* Start monitor jobId on remote computer resource.
* @param jobId
- * @param processContext
+ * @param taskContext
*/
- void monitor(String jobId, ProcessContext processContext);
+ void monitor(String jobId, TaskContext taskContext);
/**
* Stop monitoring for given jobId
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 76e5ae2..4b67ffd 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -184,11 +184,11 @@ public class GFacEngineImpl implements GFacEngine {
}
TaskModel taskModel = taskMap.get(taskId);
+ processContext.setCurrentExecutingTaskModel(taskModel);
TaskTypes taskType = taskModel.getTaskType();
TaskContext taskContext = getTaskContext(processContext);
taskContext.setTaskModel(taskModel);
ProcessStatus status = null;
- processContext.setCurrentExecutingTaskId(taskId);
switch (taskType) {
case ENV_SETUP:
status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
@@ -276,7 +276,7 @@ public class GFacEngineImpl implements GFacEngine {
}
if (processContext.isPauseTaskExecution()) {
- return; // If any task put processContext to wait, the same task should continue processContext execution.
+ return; // If any task put processContext to wait, the same task must continue processContext execution.
}
}
@@ -285,7 +285,6 @@ public class GFacEngineImpl implements GFacEngine {
private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException {
ProcessContext processContext = taskContext.getParentProcessContext();
- ProcessStatus status;
TaskStatus taskStatus;
JobMonitor monitorService = null;
try {
@@ -297,7 +296,9 @@ public class GFacEngineImpl implements GFacEngine {
MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
- monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+ monitorService.monitor(processContext.getJobModel().getJobId(), taskContext);
+ } else {
+ log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId());
}
} catch (AiravataException | TException e) {
taskStatus = new TaskStatus(TaskState.FAILED);
@@ -316,6 +317,10 @@ public class GFacEngineImpl implements GFacEngine {
GFacUtils.saveTaskError(taskContext, errorModel);
throw new GFacException(e);
}
+ if (processContext.isPauseTaskExecution()) {
+ // we won't update task status to complete, job monitor will update task status to complete after it complete monitoring for this job id.
+ return;
+ }
taskStatus = new TaskStatus(TaskState.COMPLETED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskStatus.setReason("Successfully handed over job id to job monitor service.");
@@ -432,10 +437,23 @@ public class GFacEngineImpl implements GFacEngine {
return false;
}
-
@Override
- public void recoverProcess(ProcessContext processContext, String recoverTaskId) throws GFacException {
+ public void recoverProcess(ProcessContext processContext) throws GFacException {
processContext.setRecovery(true);
+ String taskDag = processContext.getProcessModel().getTaskDag();
+ List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+ processContext.setTaskExecutionOrder(taskExecutionOrder);
+ Map<String, TaskModel> taskMap = processContext.getTaskMap();
+ String recoverTaskId = null;
+ for (String taskId : taskExecutionOrder) {
+ TaskModel taskModel = taskMap.get(taskId);
+ TaskState state = taskModel.getTaskStatus().getState();
+ if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+ recoverTaskId = taskId;
+ break;
+ }
+ }
+
continueProcess(processContext, recoverTaskId);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index c71c8e7..970cbf0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -208,7 +208,7 @@ public class GFacWorker implements Runnable {
}
}
- engine.recoverProcess(processContext, recoverTaskId);
+ engine.recoverProcess(processContext);
if (processContext.isInterrupted()) {
return;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b3a1398..f983d63 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -28,6 +28,7 @@ import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.config.ResourceConfig;
import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
@@ -36,6 +37,8 @@ import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerTy
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +71,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private Store store;
private Folder emailFolder;
private Properties properties;
- private Map<String, ProcessContext> jobMonitorMap = new ConcurrentHashMap<>();
+ private Map<String, TaskContext> jobMonitorMap = new ConcurrentHashMap<>();
private String host, emailAddress, password, storeProtocol, folderName ;
private Date monitorStartDate;
private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
@@ -116,18 +119,18 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
@Override
- public void monitor(String jobId, ProcessContext processContext) {
+ public void monitor(String jobId, TaskContext taskContext) {
log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
- jobMonitorMap.put(jobId, processContext);
- processContext.setPauseTaskExecution(true);
+ jobMonitorMap.put(jobId, taskContext);
+ taskContext.getParentProcessContext().setPauseTaskExecution(true);
}
@Override
public void stopMonitor(String jobId, boolean runOutflow) {
- ProcessContext processContext = jobMonitorMap.remove(jobId);
- if (processContext != null && runOutflow) {
+ TaskContext taskContext = jobMonitorMap.remove(jobId);
+ if (taskContext != null && runOutflow) {
try {
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
} catch (GFacException e) {
log.info("[EJM]: Error while running output tasks", e);
}
@@ -238,12 +241,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
for (Message message : searchMessages) {
try {
JobStatusResult jobStatusResult = parse(message);
- ProcessContext processContext = jobMonitorMap.get(jobStatusResult.getJobId());
- if (processContext == null) {
- processContext = jobMonitorMap.get(jobStatusResult.getJobName());
+ TaskContext taskContext = jobMonitorMap.get(jobStatusResult.getJobId());
+ if (taskContext == null) {
+ taskContext = jobMonitorMap.get(jobStatusResult.getJobName());
}
- if (processContext != null) {
- process(jobStatusResult, processContext);
+ if (taskContext != null) {
+ process(jobStatusResult, taskContext);
processedMessages.add(message);
} else {
// we can get JobExecutionContext null in multiple Gfac instances environment,
@@ -294,12 +297,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
}
- private void process(JobStatusResult jobStatusResult, ProcessContext processContext){
+ private void process(JobStatusResult jobStatusResult, TaskContext taskContext){
JobState resultState = jobStatusResult.getState();
// TODO : update job state on process context
boolean runOutflowTasks = false;
JobStatus jobStatus = new JobStatus();
- JobModel jobModel = processContext.getJobModel();
+ JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {
@@ -340,18 +343,22 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
try {
jobModel.setJobStatus(jobStatus);
log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
- GFacUtils.saveJobStatus(processContext, jobModel);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
} catch (GFacException e) {
log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " +
- "status {}", processContext.getExperimentId(), processContext.getProcessId(), jobModel
- .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
+ "status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
+ .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
}
}
if (runOutflowTasks) {
log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
try {
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
} catch (GFacException e) {
log.info("[EJM]: Error while running output tasks", e);
}