You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/25 02:41:52 UTC
[airavata] branch develop updated: Implementing job level
cancellation
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 5f182f0 Implementing job level cancellation
5f182f0 is described below
commit 5f182f0704c08f72e51204bccc037a51d87c267b
Author: dimuthu <di...@gmail.com>
AuthorDate: Sat Mar 24 22:41:46 2018 -0400
Implementing job level cancellation
---
.../helix/impl/participant/GlobalParticipant.java | 4 +-
.../impl/task/cancel/CancelCompletingTask.java | 29 ++++++
.../task/cancel/RemoteJobCancellationTask.java | 107 ++++++++++++++++++++-
.../impl/task/cancel/WorkflowCancellationTask.java | 27 +++++-
.../helix/impl/workflow/PostWorkflowManager.java | 9 +-
.../helix/impl/workflow/PreWorkflowManager.java | 60 ++++++++++--
6 files changed, 215 insertions(+), 21 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index f0a5c23..0a6c7a4 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -44,7 +44,9 @@ public class GlobalParticipant extends HelixParticipant {
"org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.LocalJobSubmissionTask",
"org.apache.airavata.helix.impl.task.staging.ArchiveTask",
- "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask"
+ "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask",
+ "org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask",
+ "org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask"
};
public Map<String, TaskFactory> getTaskFactory() {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
new file mode 100644
index 0000000..f5b9f1e
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Cancel Completing Task")
+public class CancelCompletingTask extends AiravataTask {
+ private static final Logger logger = LogManager.getLogger(CancelCompletingTask.class);
+
+ @Override
+ public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+ logger.info("Starting cancel completing task for task " + getTaskId() + ", experiment id " + getExperimentId());
+ logger.info("Process " + getProcessId() + " successfully cancelled");
+ saveAndPublishProcessStatus(ProcessState.CANCELED);
+ return onSuccess("Process " + getProcessId() + " successfully cancelled");
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index 2cfb317..e1495cd 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -1,20 +1,117 @@
package org.apache.airavata.helix.impl.task.cancel;
-import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.helix.HelixManager;
import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
@TaskDef(name = "Remote Job Cancellation Task")
-public class RemoteJobCancellationTask extends AbstractTask {
+public class RemoteJobCancellationTask extends AiravataTask {
+
+ private static final Logger logger = LogManager.getLogger(RemoteJobCancellationTask.class);
+
+ private CuratorFramework curatorClient = null;
@Override
- public TaskResult onRun(TaskHelper helper) {
- return null;
+ public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+ super.init(manager, workflowName, jobName, taskName);
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ try {
+ this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+ this.curatorClient.start();
+ } catch (ApplicationSettingsException e) {
+ logger.error("Failed to create curator client ", e);
+ throw new RuntimeException(e);
+ }
}
@Override
- public void onCancel() {
+ public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+ try {
+
+ List<String> jobs = getJobsOfProcess(getProcessId());
+
+ logger.info("Fetching jobs for process " + getProcessId());
+
+ if (jobs == null || jobs.size() == 0) {
+ return onSuccess("Can not find running jobs for process " + getProcessId());
+ }
+
+ logger.info("Found " + jobs.size() + " jobs for process");
+
+ logger.info("Fetching job manager configuration for process " + getProcessId());
+
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+ getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
+
+ for (String jobId : jobs) {
+
+ try {
+ logger.info("Cancelling job " + jobId + " of process " + getProcessId());
+ RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
+
+ logger.info("Command to cancel the job " + jobId + " : " + cancelCommand.getRawCommand());
+
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol().name(),
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
+
+ logger.info("Running cancel command on compute host");
+ CommandOutput commandOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null);
+
+ if (commandOutput.getExitCode() != 0) {
+ logger.error("Failed to execute job cancellation command for job " + jobId + " Sout : " +
+ commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError());
+ return onFail("Failed to execute job cancellation command for job " + jobId + " Sout : " +
+ commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError(), true, null);
+ }
+ } catch (Exception ex) {
+ logger.error("Unknown error while canceling job " + jobId + " of process " + getProcessId());
+ return onFail("Unknown error while canceling job " + jobId + " of process " + getProcessId(), true, ex);
+ }
+ }
+
+ logger.info("Successfully completed job cancellation task");
+ return onSuccess("Successfully completed job cancellation task");
+
+ } catch (Exception e) {
+ logger.error("Unknown error while canceling jobs of process " + getProcessId());
+ return onFail("Unknown error while canceling jobs of process " + getProcessId(), true, e);
+ }
+
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+ private List<String> getJobsOfProcess(String processId) throws Exception {
+ String path = "/registry/" + processId + "/jobs";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ return this.curatorClient.getChildren().forPath(path);
+ } else {
+ return null;
+ }
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 3d1b03d..2abca2c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -10,6 +10,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -23,6 +24,9 @@ public class WorkflowCancellationTask extends AbstractTask {
@TaskParam(name = "Cancelling Workflow")
private String cancellingWorkflowName;
+ @TaskParam(name = "Waiting time to monitor status (s)")
+ private int waitTime = 20;
+
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
@@ -50,9 +54,22 @@ public class WorkflowCancellationTask extends AbstractTask {
@Override
public TaskResult onRun(TaskHelper helper) {
logger.info("Cancelling workflow " + cancellingWorkflowName);
+
+ if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
+ return onFail("Can not find a workflow with name " + cancellingWorkflowName, true);
+ }
try {
+
+ TaskState workflowState = taskDriver.getWorkflowContext(cancellingWorkflowName).getWorkflowState();
+ logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
+
taskDriver.stop(cancellingWorkflowName);
- logger.info("Workflow " + cancellingWorkflowName + " cancelled");
+
+ logger.info("Waiting maximum " + waitTime +"s for workflow " + cancellingWorkflowName + " state to change");
+ TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000, TaskState.COMPLETED, TaskState.FAILED,
+ TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
+
+ logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
} catch (Exception e) {
logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
@@ -72,4 +89,12 @@ public class WorkflowCancellationTask extends AbstractTask {
public void setCancellingWorkflowName(String cancellingWorkflowName) {
this.cancellingWorkflowName = cancellingWorkflowName;
}
+
+ public int getWaitTime() {
+ return waitTime;
+ }
+
+ public void setWaitTime(int waitTime) {
+ this.waitTime = waitTime;
+ }
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 5c0df31..b112e81 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -175,17 +175,16 @@ public class PostWorkflowManager {
String status = getStatusByProcess(processId);
+ logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
+ + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+ saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
+
// TODO get cluster lock before that
if ("cancel".equals(status)) {
logger.info("Cancelled post workflow for process " + processId);
// TODO to be implemented
} else {
- logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
- + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
-
- saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
-
if (jobStatusResult.getState() == JobState.COMPLETE) {
logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index ff570ae..930efff 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -26,6 +26,8 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
+import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
@@ -91,7 +93,12 @@ public class PreWorkflowManager {
}
private List<String> getWorkflowsOfProcess(String processId) throws Exception {
- return this.curatorClient.getChildren().forPath("/registry/" + processId + "/workflows");
+ String path = "/registry/" + processId + "/workflows";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ return this.curatorClient.getChildren().forPath(path);
+ } else {
+ return null;
+ }
}
private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
@@ -155,6 +162,12 @@ public class PreWorkflowManager {
}
private String createAndLaunchCancelWorkflow(String processId, String gateway) throws Exception {
+
+ ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+ ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+
+ String experimentId = processModel.getExperimentId();
+
registerCancelProcess(processId);
List<String> workflows = getWorkflowsOfProcess(processId);
final List<AbstractTask> allTasks = new ArrayList<>();
@@ -164,21 +177,50 @@ public class PreWorkflowManager {
WorkflowCancellationTask wfct = new WorkflowCancellationTask();
wfct.setTaskId(UUID.randomUUID().toString());
wfct.setCancellingWorkflowName(wf);
+
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() -1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+ }
allTasks.add(wfct);
}
- WorkflowManager workflowManager = new WorkflowManager(
- ServerSettings.getSetting("helix.cluster.name"),
- ServerSettings.getSetting("post.workflow.manager.name"),
- ServerSettings.getZookeeperConnection());
-
- String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
- logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
- return workflow;
} else {
logger.info("No workflow registered with process " + processId + " to cancel");
return null;
}
+
+ RemoteJobCancellationTask rjct = new RemoteJobCancellationTask();
+ rjct.setTaskId(UUID.randomUUID().toString());
+ rjct.setExperimentId(experimentId);
+ rjct.setProcessId(processId);
+ rjct.setGatewayId(gateway);
+ rjct.setSkipTaskStatusPublish(true);
+
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() -1).setNextTask(new OutPort(rjct.getTaskId(), rjct));
+ }
+ allTasks.add(rjct);
+
+ CancelCompletingTask cct = new CancelCompletingTask();
+ cct.setTaskId(UUID.randomUUID().toString());
+ cct.setExperimentId(experimentId);
+ cct.setProcessId(processId);
+ cct.setGatewayId(gateway);
+ cct.setSkipTaskStatusPublish(true);
+
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() -1).setNextTask(new OutPort(cct.getTaskId(), cct));
+ }
+ allTasks.add(cct);
+
+ WorkflowManager workflowManager = new WorkflowManager(
+ ServerSettings.getSetting("helix.cluster.name"),
+ ServerSettings.getSetting("post.workflow.manager.name"),
+ ServerSettings.getZookeeperConnection());
+
+ String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
+ logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
+ return workflow;
}
public static void main(String[] args) throws Exception {
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.