You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/13 18:03:07 UTC

[30/42] airavata git commit: Fixing issues with cancelation with recovery

Fixing issues with cancelation with recovery


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d338d07
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d338d07
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d338d07

Branch: refs/heads/develop
Commit: 6d338d07f039bcc30de2d9324a8a37f268ea3b2b
Parents: 0b75afd
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Jan 7 18:32:18 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Jan 7 18:32:18 2016 -0500

----------------------------------------------------------------------
 .../gfac/core/context/ProcessContext.java       |  9 +++
 .../airavata/gfac/impl/GFacEngineImpl.java      | 75 +++++++++++++++++++-
 .../airavata/gfac/server/GfacServerHandler.java | 50 +++++++++----
 .../server/OrchestratorServerHandler.java       | 15 ++--
 4 files changed, 131 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 02d4cbc..2880551 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -97,6 +97,7 @@ public class ProcessContext {
     private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
 	private boolean acknowledge;
 	private SSHKeyAuthentication sshKeyAuthentication;
+	private boolean recoveryWithCancel = false;
 
 	/**
 	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -501,5 +502,13 @@ public class ProcessContext {
 	public void setSshKeyAuthentication(SSHKeyAuthentication sshKeyAuthentication) {
 		this.sshKeyAuthentication = sshKeyAuthentication;
 	}
+
+	public boolean isRecoveryWithCancel() {
+		return recoveryWithCancel;
+	}
+
+	public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+		this.recoveryWithCancel = recoveryWithCancel;
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index dd10c12..00d920d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -61,6 +62,9 @@ import org.apache.airavata.registry.cpi.ExpCatChildDataType;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +95,14 @@ public class GFacEngineImpl implements GFacEngine {
 
             ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
             processContext.setProcessModel(processModel);
+
+            try {
+                checkRecoveryWithCancel(processContext);
+            } catch (Exception e) {
+                log.error("expId: {}, processId: {}, Error while checking process cancel data in zookeeper",
+                        processContext.getExperimentId(), processContext.getProcessId());
+            }
+
             GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
             processContext.setGatewayResourceProfile(gatewayProfile);
             processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
@@ -176,6 +188,19 @@ public class GFacEngineImpl implements GFacEngine {
         }
     }
 
+    private void checkRecoveryWithCancel(ProcessContext processContext) throws Exception {
+        CuratorFramework curatorClient = processContext.getCuratorClient();
+        String experimentId = processContext.getExperimentId();
+        String processId = processContext.getProcessId();
+        String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath(
+                ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        log.info("expId: {}, processId: {}, get process cancel data from zookeeper node {}", experimentId, processId, processCancelNodePath);
+        byte[] bytes = curatorClient.getData().forPath(processCancelNodePath);
+        if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
+            processContext.setRecoveryWithCancel(true);
+        }
+    }
+
     @Override
     public void executeProcess(ProcessContext processContext) throws GFacException {
         if (processContext.isInterrupted()) {
@@ -509,6 +534,7 @@ public class GFacEngineImpl implements GFacEngine {
         processContext.setTaskExecutionOrder(taskExecutionOrder);
         Map<String, TaskModel> taskMap = processContext.getTaskMap();
         String recoverTaskId = null;
+        String previousTaskId = null;
         TaskModel taskModel = null;
         for (String taskId : taskExecutionOrder) {
             taskModel = taskMap.get(taskId);
@@ -517,9 +543,56 @@ public class GFacEngineImpl implements GFacEngine {
                 recoverTaskId = taskId;
                 break;
             }
+            previousTaskId = taskId;
+        }
+        final String rTaskId = recoverTaskId;
+        final String pTaskId = previousTaskId;
+        if (recoverTaskId != null) {
+            if (processContext.isRecoveryWithCancel()) {
+                cancelJobSubmission(processContext, rTaskId, pTaskId);
+            }
+            continueProcess(processContext, recoverTaskId);
         }
+    }
 
-        continueProcess(processContext, recoverTaskId);
+    private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) {
+        new Thread(() -> {
+            try {
+                processContext.setCancel(true);
+                ProcessState processState = processContext.getProcessState();
+                List<Object> jobModels = null;
+                switch (processState) {
+                    case EXECUTING:
+                        jobModels = processContext.getExperimentCatalog().get(
+                                ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID,
+                                rTaskId);
+                        break;
+                    case MONITORING:
+                        if (pTaskId != null) {
+                            jobModels = processContext.getExperimentCatalog().get(
+                                    ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID,
+                                    pTaskId);
+                        }
+                }
+
+                if (jobModels != null && !jobModels.isEmpty()) {
+                    JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1);
+                    processContext.setJobModel(jobModel);
+                    log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
+                            processContext.getProcessId(), jobModel.getJobId());
+                    cancelProcess(processContext);
+                    log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
+                            processContext.getProcessId(), jobModel.getJobId());
+                }
+            } catch (GFacException e) {
+                log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode",
+                        processContext.getExperimentId(), processContext.getProcessId());
+            } catch (RegistryException e) {
+                log.error("expId: {}, processId: {}, Error while getting job model for taskId {}, " +
+                                "couldn't cancel process which is in recovery mode", processContext.getExperimentId(),
+                        processContext.getProcessId(), rTaskId);
+            }
+        }).start();
     }
 
     private JobModel getJobModel(ProcessContext processContext) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 10da052..c59e199 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,6 +28,7 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
@@ -247,14 +248,29 @@ public class GfacServerHandler implements GfacService.Iface {
 			                .getProcessId());
 	                publishProcessStatus(event, status);
                     try {
-	                    createProcessZKNode(curatorClient, gfacServerName, event, message);
+                        createProcessZKNode(curatorClient, gfacServerName, event, message);
                         boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId());
-                        submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                         if (isCancel) {
-                            // Need to trigger process cancel watcher, wait till process recover and then set zk data.
-                            Thread.sleep(10000);
-                            setCancelData(event.getExperimentId());
+                            if (status.getState() == ProcessState.STARTED) {
+                                status.setState(ProcessState.CANCELLING);
+                                status.setReason("Process Cancel is triggered");
+                                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+                                publishProcessStatus(event, status);
+
+                                // do cancel operation here
+
+                                status.setState(ProcessState.CANCELED);
+                                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+                                publishProcessStatus(event, status);
+                                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                                return;
+                            } else {
+                                setCancelData(event.getExperimentId(),event.getProcessId());
+                            }
                         }
+                        submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
                         rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -295,10 +311,11 @@ public class GfacServerHandler implements GfacService.Iface {
         }
     }
 
-    private void setCancelData(String experimentId) throws Exception {
-        String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
-                experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-        curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+    private void setCancelData(String experimentId, String processId) throws Exception {
+        String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath(
+                ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        log.info("expId: {}, processId: {}, set process cancel data to zookeeper node {}", experimentId, processId, processCancelNodePath);
+        curatorClient.setData().withVersion(-1).forPath(processCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
                 .getBytes());
     }
 
@@ -307,10 +324,15 @@ public class GfacServerHandler implements GfacService.Iface {
                                      String processId) throws Exception {
 
         String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
-        // create /experiments/{experimentId}/cancel node and set watcher for data changes
+        // /experiments/{experimentId}/cancelListener, set watcher for data changes
         String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-        byte[] bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
-        return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+        byte[] bytes = curatorClient.getData().forPath(experimentCancelNode);
+        if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
+            return true;
+        } else {
+            bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
+            return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+        }
 
     }
 
@@ -339,6 +361,10 @@ public class GfacServerHandler implements GfacService.Iface {
 		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
 		curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId)).forPath(zkProcessNodePath);
 
+        // create /experiments//{experimentId}{processId}/cancelListener
+        String zkProcessCancelPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessCancelPath);
+
 		// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
 		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 9e39c47..2cda709 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -408,16 +408,21 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
         private boolean launchSingleAppExperiment() throws TException {
             try {
-                List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS, AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId);
+                List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS,
+						AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId);
                 for (String processId : processIds) {
                     launchProcess(processId, airavataCredStoreToken, gatewayId);
                 }
-
-            } catch (Exception e) {
+				ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
+				status.setReason("submitted all processes");
+				status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+				OrchestratorUtils.updageExperimentStatus(experimentId, status);
+				log.info("expId: {}, Launched experiment ", experimentId);
+			} catch (Exception e) {
 	            ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED);
 	            status.setReason("Error while updating task status");
 	            OrchestratorUtils.updageExperimentStatus(experimentId, status);
-	            log.error(experimentId, "Error while updating task status, hence updated experiment status to " +
+	            log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " +
 			            ExperimentState.FAILED, e);
 	            throw new TException(e);
             }
@@ -465,7 +470,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 										.getExperimentId());
 								if (stat.getState() == ExperimentState.CANCELING) {
 									status.setState(ExperimentState.CANCELING);
-									status.setReason("Process competed but experiment cancelling is triggered");
+									status.setReason("Process started but experiment cancelling is triggered");
 								} else {
 									status.setState(ExperimentState.EXECUTING);
 									status.setReason("process  started");