You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:47:29 UTC

[04/51] [abbrv] airavata git commit: AutoScheduling gfac side changes

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 946055e..52cd395 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,22 +21,24 @@
 
 package org.apache.airavata.gfac.impl;
 
-import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
 
 public class GFacWorker implements Runnable {
 
@@ -46,7 +48,7 @@ public class GFacWorker implements Runnable {
 	private String processId;
 	private String gatewayId;
 	private String tokenId;
-	private boolean runOutflow = false;
+	private boolean continueTaskFlow = false;
 
 
 	/**
@@ -61,7 +63,7 @@ public class GFacWorker implements Runnable {
 		this.tokenId = processContext.getTokenId();
 		engine = Factory.getGFacEngine();
 		this.processContext = processContext;
-		runOutflow = true;
+		continueTaskFlow = true;
 	}
 
 	/**
@@ -98,19 +100,15 @@ public class GFacWorker implements Runnable {
 				case CONFIGURING_WORKSPACE:
 				case INPUT_DATA_STAGING:
 				case EXECUTING:
-					recoverProcess();
-					break;
 				case MONITORING:
-					if (runOutflow) {
-						runProcessOutflow();
-					} else {
-						monitorProcess();
-					}
-					break;
-				case OUTPUT_DATA_STAGING:
-				case POST_PROCESSING:
-					recoverProcessOutflow();
-					break;
+                case OUTPUT_DATA_STAGING:
+                case POST_PROCESSING:
+                    if (continueTaskFlow) {
+                        continueTaskExecution();
+                    } else {
+                        recoverProcess();
+                    }
+                    break;
 				case COMPLETED:
 					completeProcess();
 					break;
@@ -160,56 +158,94 @@ public class GFacWorker implements Runnable {
 		Factory.getGfacContext().removeProcess(processContext.getProcessId());
 	}
 
-	private void recoverProcessOutflow() throws GFacException {
-		engine.recoverProcessOutflow(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		completeProcess();
-	}
+	private void continueTaskExecution() throws GFacException {
+        // checkpoint
+        if (processContext.isInterrupted()) {
+            return;
+        }
+        processContext.setPauseTaskExecution(false);
+        List<String> taskExecutionOrder = processContext.getTaskExecutionOrder();
+        String currentExecutingTaskId = processContext.getCurrentExecutingTaskId();
+        boolean found = false;
+        String nextTaskId = null;
+        for (String taskId : taskExecutionOrder) {
+            if (!found) {
+                if (taskId.equalsIgnoreCase(currentExecutingTaskId)) {
+                    found = true;
+                }
+                continue;
+            } else {
+                nextTaskId = taskId;
+                break;
+            }
+        }
+        if (nextTaskId != null) {
+            engine.continueProcess(processContext, nextTaskId);
+        }
+        // checkpoint
+        if (processContext.isInterrupted()) {
+            return;
+        }
 
-	private void runProcessOutflow() throws GFacException {
-		engine.runProcessOutflow(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		completeProcess();
-	}
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
+    }
 
 	private void recoverProcess() throws GFacException {
-		engine.recoverProcess(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		monitorProcess();
-	}
+
+        String taskDag = processContext.getProcessModel().getTaskDag();
+        List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+        Map<String, TaskModel> taskMap = processContext.getTaskMap();
+        String recoverTaskId = null;
+        for (String taskId : taskExecutionOrder) {
+            TaskModel taskModel = taskMap.get(taskId);
+            TaskState state = taskModel.getTaskStatus().getState();
+            if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+                recoverTaskId = taskId;
+                break;
+            }
+        }
+
+        engine.recoverProcess(processContext, recoverTaskId);
+        if (processContext.isInterrupted()) {
+            return;
+        }
+
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
+    }
 
 	private void executeProcess() throws GFacException {
 		engine.executeProcess(processContext);
 		if (processContext.isInterrupted()) {
 			return;
 		}
-		monitorProcess();
-	}
 
-	private void monitorProcess() throws GFacException {
-		try {
-			JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
-			if (monitorService != null) {
-				monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-                ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
-                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                processContext.setProcessStatus(status);
-				GFacUtils.saveAndPublishProcessStatus(processContext);
-			} else {
-				// we directly invoke outflow
-				runProcessOutflow();
-			}
-		} catch (AiravataException e) {
-			throw new GFacException("Error while retrieving moniot service", e);
-		}
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
 	}
 
+//	private void monitorProcess() throws GFacException {
+//		try {
+//			JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
+//			if (monitorService != null) {
+//				monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+//                ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
+//                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+//                processContext.setProcessStatus(status);
+//				GFacUtils.saveAndPublishProcessStatus(processContext);
+//			} else {
+//				// we directly invoke outflow
+//				continueTaskExecution();
+//			}
+//		} catch (AiravataException e) {
+//			throw new GFacException("Error while retrieving moniot service", e);
+//		}
+//	}
+
 	private void sendAck() {
 		try {
 			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 359d39c..c4d4676 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -32,10 +32,6 @@ import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
 import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
@@ -52,7 +48,6 @@ import javax.mail.Session;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -124,6 +119,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	public void monitor(String jobId, ProcessContext processContext) {
 		log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
 		jobMonitorMap.put(jobId, processContext);
+        processContext.setPauseTaskExecution(true);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b892a00..4468a1c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -371,7 +371,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskStatus(taskStatus);
         taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
         JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
-        submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+//        submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
         byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
         taskModel.setSubTaskModel(bytes);
         orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,