You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:47:29 UTC
[04/51] [abbrv] airavata git commit: AutoScheduling gfac side changes
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 946055e..52cd395 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,22 +21,24 @@
package org.apache.airavata.gfac.impl;
-import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
public class GFacWorker implements Runnable {
@@ -46,7 +48,7 @@ public class GFacWorker implements Runnable {
private String processId;
private String gatewayId;
private String tokenId;
- private boolean runOutflow = false;
+ private boolean continueTaskFlow = false;
/**
@@ -61,7 +63,7 @@ public class GFacWorker implements Runnable {
this.tokenId = processContext.getTokenId();
engine = Factory.getGFacEngine();
this.processContext = processContext;
- runOutflow = true;
+ continueTaskFlow = true;
}
/**
@@ -98,19 +100,15 @@ public class GFacWorker implements Runnable {
case CONFIGURING_WORKSPACE:
case INPUT_DATA_STAGING:
case EXECUTING:
- recoverProcess();
- break;
case MONITORING:
- if (runOutflow) {
- runProcessOutflow();
- } else {
- monitorProcess();
- }
- break;
- case OUTPUT_DATA_STAGING:
- case POST_PROCESSING:
- recoverProcessOutflow();
- break;
+ case OUTPUT_DATA_STAGING:
+ case POST_PROCESSING:
+ if (continueTaskFlow) {
+ continueTaskExecution();
+ } else {
+ recoverProcess();
+ }
+ break;
case COMPLETED:
completeProcess();
break;
@@ -160,56 +158,94 @@ public class GFacWorker implements Runnable {
Factory.getGfacContext().removeProcess(processContext.getProcessId());
}
- private void recoverProcessOutflow() throws GFacException {
- engine.recoverProcessOutflow(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- completeProcess();
- }
+ private void continueTaskExecution() throws GFacException {
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ return;
+ }
+ processContext.setPauseTaskExecution(false);
+ List<String> taskExecutionOrder = processContext.getTaskExecutionOrder();
+ String currentExecutingTaskId = processContext.getCurrentExecutingTaskId();
+ boolean found = false;
+ String nextTaskId = null;
+ for (String taskId : taskExecutionOrder) {
+ if (!found) {
+ if (taskId.equalsIgnoreCase(currentExecutingTaskId)) {
+ found = true;
+ }
+ continue;
+ } else {
+ nextTaskId = taskId;
+ break;
+ }
+ }
+ if (nextTaskId != null) {
+ engine.continueProcess(processContext, nextTaskId);
+ }
+ // checkpoint
+ if (processContext.isInterrupted()) {
+ return;
+ }
- private void runProcessOutflow() throws GFacException {
- engine.runProcessOutflow(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- completeProcess();
- }
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
+ }
private void recoverProcess() throws GFacException {
- engine.recoverProcess(processContext);
- if (processContext.isInterrupted()) {
- return;
- }
- monitorProcess();
- }
+
+ String taskDag = processContext.getProcessModel().getTaskDag();
+ List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+ Map<String, TaskModel> taskMap = processContext.getTaskMap();
+ String recoverTaskId = null;
+ for (String taskId : taskExecutionOrder) {
+ TaskModel taskModel = taskMap.get(taskId);
+ TaskState state = taskModel.getTaskStatus().getState();
+ if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+ recoverTaskId = taskId;
+ break;
+ }
+ }
+
+ engine.recoverProcess(processContext, recoverTaskId);
+ if (processContext.isInterrupted()) {
+ return;
+ }
+
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
+ }
private void executeProcess() throws GFacException {
engine.executeProcess(processContext);
if (processContext.isInterrupted()) {
return;
}
- monitorProcess();
- }
- private void monitorProcess() throws GFacException {
- try {
- JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
- if (monitorService != null) {
- monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
- ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- } else {
- // we directly invoke outflow
- runProcessOutflow();
- }
- } catch (AiravataException e) {
- throw new GFacException("Error while retrieving moniot service", e);
- }
+ if (processContext.isComplete()) {
+ completeProcess();
+ }
}
+// private void monitorProcess() throws GFacException {
+// try {
+// JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
+// if (monitorService != null) {
+// monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+// ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
+// status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+// processContext.setProcessStatus(status);
+// GFacUtils.saveAndPublishProcessStatus(processContext);
+// } else {
+// // we directly invoke outflow
+// continueTaskExecution();
+// }
+// } catch (AiravataException e) {
+// throw new GFacException("Error while retrieving moniot service", e);
+// }
+// }
+
private void sendAck() {
try {
long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 359d39c..c4d4676 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -32,10 +32,6 @@ import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
@@ -52,7 +48,6 @@ import javax.mail.Session;
import javax.mail.Store;
import javax.mail.search.FlagTerm;
import javax.mail.search.SearchTerm;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -124,6 +119,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
public void monitor(String jobId, ProcessContext processContext) {
log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
jobMonitorMap.put(jobId, processContext);
+ processContext.setPauseTaskExecution(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b892a00..4468a1c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -371,7 +371,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskModel.setTaskStatus(taskStatus);
taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
- submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+// submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
taskModel.setSubTaskModel(bytes);
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,