You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/25 02:41:52 UTC

[airavata] branch develop updated: Implementing job level cancellation

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5f182f0  Implementing job level cancellation
5f182f0 is described below

commit 5f182f0704c08f72e51204bccc037a51d87c267b
Author: dimuthu <di...@gmail.com>
AuthorDate: Sat Mar 24 22:41:46 2018 -0400

    Implementing job level cancellation
---
 .../helix/impl/participant/GlobalParticipant.java  |   4 +-
 .../impl/task/cancel/CancelCompletingTask.java     |  29 ++++++
 .../task/cancel/RemoteJobCancellationTask.java     | 107 ++++++++++++++++++++-
 .../impl/task/cancel/WorkflowCancellationTask.java |  27 +++++-
 .../helix/impl/workflow/PostWorkflowManager.java   |   9 +-
 .../helix/impl/workflow/PreWorkflowManager.java    |  60 ++++++++++--
 6 files changed, 215 insertions(+), 21 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index f0a5c23..0a6c7a4 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -44,7 +44,9 @@ public class GlobalParticipant extends HelixParticipant {
         "org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.submission.LocalJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.staging.ArchiveTask",
-        "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask"
+        "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask",
+        "org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask",
+        "org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask"
     };
 
     public Map<String, TaskFactory> getTaskFactory() {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
new file mode 100644
index 0000000..f5b9f1e
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Cancel Completing Task")
+public class CancelCompletingTask extends AiravataTask {
+    private static final Logger logger = LogManager.getLogger(CancelCompletingTask.class);
+
+    @Override
+    public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+        logger.info("Starting cancel completing task for task " + getTaskId() + ", experiment id " + getExperimentId());
+        logger.info("Process " + getProcessId() + " successfully cancelled");
+        saveAndPublishProcessStatus(ProcessState.CANCELED);
+        return onSuccess("Process " + getProcessId() + " successfully cancelled");
+    }
+
+    @Override
+    public void onCancel(TaskContext taskContext) {
+
+    }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index 2cfb317..e1495cd 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -1,20 +1,117 @@
 package org.apache.airavata.helix.impl.task.cancel;
 
-import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.helix.HelixManager;
 import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
 
 @TaskDef(name = "Remote Job Cancellation Task")
-public class RemoteJobCancellationTask extends AbstractTask {
+public class RemoteJobCancellationTask extends AiravataTask {
+
+    private static final Logger logger = LogManager.getLogger(RemoteJobCancellationTask.class);
+
+    private CuratorFramework curatorClient = null;
 
     @Override
-    public TaskResult onRun(TaskHelper helper) {
-        return null;
+    public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+        super.init(manager, workflowName, jobName, taskName);
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        try {
+            this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+            this.curatorClient.start();
+        } catch (ApplicationSettingsException e) {
+            logger.error("Failed to create curator client ", e);
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
-    public void onCancel() {
+    public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+        try {
+
+            List<String> jobs = getJobsOfProcess(getProcessId());
+
+            logger.info("Fetching jobs for process " + getProcessId());
+
+            if (jobs == null || jobs.size() == 0) {
+                return onSuccess("Can not find running jobs for process " + getProcessId());
+            }
+
+            logger.info("Found " + jobs.size() + " jobs for process");
+
+            logger.info("Fetching job manager configuration for process " + getProcessId());
+
+            JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+                    getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
+
+            for (String jobId : jobs) {
+
+                try {
+                    logger.info("Cancelling job " + jobId + " of process " + getProcessId());
+                    RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
+
+                    logger.info("Command to cancel the job " + jobId + " : " + cancelCommand.getRawCommand());
+
+                    AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                            getTaskContext().getGatewayId(),
+                            getTaskContext().getComputeResourceId(),
+                            getTaskContext().getJobSubmissionProtocol().name(),
+                            getTaskContext().getComputeResourceCredentialToken(),
+                            getTaskContext().getComputeResourceLoginUserName());
+
+                    logger.info("Running cancel command on compute host");
+                    CommandOutput commandOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null);
+
+                    if (commandOutput.getExitCode() != 0) {
+                        logger.error("Failed to execute job cancellation command for job " + jobId + " Sout : " +
+                                commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError());
+                        return onFail("Failed to execute job cancellation command for job " + jobId + " Sout : " +
+                                commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError(), true, null);
+                    }
+                } catch (Exception ex) {
+                    logger.error("Unknown error while canceling job " + jobId + " of process " + getProcessId());
+                    return onFail("Unknown error while canceling job " + jobId + " of process " + getProcessId(), true, ex);
+                }
+            }
+
+            logger.info("Successfully completed job cancellation task");
+            return onSuccess("Successfully completed job cancellation task");
+
+        } catch (Exception e) {
+            logger.error("Unknown error while canceling jobs of process " + getProcessId());
+            return onFail("Unknown error while canceling jobs of process " + getProcessId(), true, e);
+        }
+
+    }
+
+    @Override
+    public void onCancel(TaskContext taskContext) {
+
+    }
 
+    private List<String> getJobsOfProcess(String processId) throws Exception {
+        String path = "/registry/" + processId + "/jobs";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            return this.curatorClient.getChildren().forPath(path);
+        } else {
+            return null;
+        }
     }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 3d1b03d..2abca2c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -10,6 +10,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -23,6 +24,9 @@ public class WorkflowCancellationTask extends AbstractTask {
     @TaskParam(name = "Cancelling Workflow")
     private String cancellingWorkflowName;
 
+    @TaskParam(name = "Waiting time to monitor status (s)")
+    private int waitTime = 20;
+
     @Override
     public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
@@ -50,9 +54,22 @@ public class WorkflowCancellationTask extends AbstractTask {
     @Override
     public TaskResult onRun(TaskHelper helper) {
         logger.info("Cancelling workflow " + cancellingWorkflowName);
+
+        if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
+            return onFail("Can not find a workflow with name " + cancellingWorkflowName, true);
+        }
         try {
+
+            TaskState workflowState = taskDriver.getWorkflowContext(cancellingWorkflowName).getWorkflowState();
+            logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
+
             taskDriver.stop(cancellingWorkflowName);
-            logger.info("Workflow " + cancellingWorkflowName + " cancelled");
+
+            logger.info("Waiting maximum " + waitTime +"s for workflow " + cancellingWorkflowName + " state to change");
+            TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000, TaskState.COMPLETED, TaskState.FAILED,
+                    TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
+
+            logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
             return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
         } catch (Exception e) {
             logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
@@ -72,4 +89,12 @@ public class WorkflowCancellationTask extends AbstractTask {
     public void setCancellingWorkflowName(String cancellingWorkflowName) {
         this.cancellingWorkflowName = cancellingWorkflowName;
     }
+
+    public int getWaitTime() {
+        return waitTime;
+    }
+
+    public void setWaitTime(int waitTime) {
+        this.waitTime = waitTime;
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 5c0df31..b112e81 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -175,17 +175,16 @@ public class PostWorkflowManager {
 
                 String status = getStatusByProcess(processId);
 
+                logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
+                        + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+                saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
+
                 // TODO get cluster lock before that
                 if ("cancel".equals(status)) {
                     logger.info("Cancelled post workflow for process " + processId);
                     // TODO to be implemented
                 } else {
 
-                    logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
-                            + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
-
-                    saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
-
                     if (jobStatusResult.getState() == JobState.COMPLETE) {
 
                         logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index ff570ae..930efff 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -26,6 +26,8 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
+import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
 import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
 import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
 import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
@@ -91,7 +93,12 @@ public class PreWorkflowManager {
     }
 
     private List<String> getWorkflowsOfProcess(String processId) throws Exception {
-        return this.curatorClient.getChildren().forPath("/registry/" + processId + "/workflows");
+        String path = "/registry/" + processId + "/workflows";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            return this.curatorClient.getChildren().forPath(path);
+        } else {
+            return null;
+        }
     }
 
     private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
@@ -155,6 +162,12 @@ public class PreWorkflowManager {
     }
 
     private String createAndLaunchCancelWorkflow(String processId, String gateway) throws Exception {
+
+        ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+        ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+
+        String experimentId = processModel.getExperimentId();
+
         registerCancelProcess(processId);
         List<String> workflows = getWorkflowsOfProcess(processId);
         final List<AbstractTask> allTasks = new ArrayList<>();
@@ -164,21 +177,50 @@ public class PreWorkflowManager {
                 WorkflowCancellationTask wfct = new WorkflowCancellationTask();
                 wfct.setTaskId(UUID.randomUUID().toString());
                 wfct.setCancellingWorkflowName(wf);
+
+                if (allTasks.size() > 0) {
+                    allTasks.get(allTasks.size() -1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+                }
                 allTasks.add(wfct);
             }
 
-            WorkflowManager workflowManager = new WorkflowManager(
-                    ServerSettings.getSetting("helix.cluster.name"),
-                    ServerSettings.getSetting("post.workflow.manager.name"),
-                    ServerSettings.getZookeeperConnection());
-
-            String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
-            logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
-            return workflow;
         } else {
             logger.info("No workflow registered with process " + processId + " to cancel");
             return null;
         }
+
+        RemoteJobCancellationTask rjct = new RemoteJobCancellationTask();
+        rjct.setTaskId(UUID.randomUUID().toString());
+        rjct.setExperimentId(experimentId);
+        rjct.setProcessId(processId);
+        rjct.setGatewayId(gateway);
+        rjct.setSkipTaskStatusPublish(true);
+
+        if (allTasks.size() > 0) {
+            allTasks.get(allTasks.size() -1).setNextTask(new OutPort(rjct.getTaskId(), rjct));
+        }
+        allTasks.add(rjct);
+
+        CancelCompletingTask cct = new CancelCompletingTask();
+        cct.setTaskId(UUID.randomUUID().toString());
+        cct.setExperimentId(experimentId);
+        cct.setProcessId(processId);
+        cct.setGatewayId(gateway);
+        cct.setSkipTaskStatusPublish(true);
+
+        if (allTasks.size() > 0) {
+            allTasks.get(allTasks.size() -1).setNextTask(new OutPort(cct.getTaskId(), cct));
+        }
+        allTasks.add(cct);
+
+        WorkflowManager workflowManager = new WorkflowManager(
+                ServerSettings.getSetting("helix.cluster.name"),
+                ServerSettings.getSetting("post.workflow.manager.name"),
+                ServerSettings.getZookeeperConnection());
+
+        String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
+        logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
+        return workflow;
     }
 
     public static void main(String[] args) throws Exception {

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.