You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/09 23:21:12 UTC
[airavata] branch develop updated: In an experiment cancel request,
marking the experiment is cancelled once the job is cancelled,
completed or failed
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new f912d39 In an experiment cancel request, marking the experiment is cancelled once the job is cancelled, completed or failed
f912d39 is described below
commit f912d39d37e85d0ac9b3a5c4a027714d17e208f2
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Apr 9 19:20:51 2018 -0400
In an experiment cancel request, marking the experiment is cancelled once the job is cancelled, completed or failed
---
.../airavata/helix/impl/task/AiravataTask.java | 1 +
.../impl/task/cancel/CancelCompletingTask.java | 11 +++-
.../task/cancel/RemoteJobCancellationTask.java | 58 ++++++++++++++++++----
.../helix/impl/workflow/PostWorkflowManager.java | 19 ++++++-
.../helix/impl/workflow/PreWorkflowManager.java | 30 ++---------
.../helix/impl/workflow/WorkflowManager.java | 35 +++++++++++++
6 files changed, 116 insertions(+), 38 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 6eb6456..c108bd3 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -81,6 +81,7 @@ public abstract class AiravataTask extends AbstractTask {
private boolean skipTaskStatusPublish = false;
protected TaskResult onSuccess(String message) {
+ logger.info(message);
if (!skipTaskStatusPublish) {
publishTaskState(TaskState.COMPLETED);
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
index 308cc0e..c8c39a7 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
@@ -17,7 +17,16 @@ public class CancelCompletingTask extends AiravataTask {
public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
logger.info("Starting cancel completing task for task " + getTaskId() + ", experiment id " + getExperimentId());
logger.info("Process " + getProcessId() + " successfully cancelled");
- saveAndPublishProcessStatus(ProcessState.CANCELED);
+ String cancelled = getContextVariable(RemoteJobCancellationTask.JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE);
+
+ if ("true".equals(cancelled)) {
+ // make the experiment state as cancelled if it is already being cancelled or similar state.
+ // Otherwise wait for the post workflow to cancel the experiment
+ logger.info("Making process as cancelled as the job is already being cancelled or not available");
+ saveAndPublishProcessStatus(ProcessState.CANCELED);
+ } else {
+ logger.info("Not updating process as cancelled as the job is not cancelled yet");
+ }
return onSuccess("Process " + getProcessId() + " successfully cancelled");
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index a4aa2ac..3c5803e 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -9,6 +9,7 @@ import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfigura
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.helix.HelixManager;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
@@ -21,6 +22,8 @@ public class RemoteJobCancellationTask extends AiravataTask {
private final static Logger logger = LoggerFactory.getLogger(RemoteJobCancellationTask.class);
+ public static final String JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE = "job-already-cancelled";
+
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
@@ -35,6 +38,7 @@ public class RemoteJobCancellationTask extends AiravataTask {
logger.info("Fetching jobs for process " + getProcessId());
if (jobs == null || jobs.size() == 0) {
+ setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, "true");
return onSuccess("Can not find running jobs for process " + getProcessId());
}
@@ -48,29 +52,63 @@ public class RemoteJobCancellationTask extends AiravataTask {
getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getPreferredJobSubmissionInterface()));
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol(),
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
+
for (String jobId : jobs) {
try {
+ logger.info("Fetching current job status for job id " + jobId);
+ RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobId);
+
+ CommandOutput jobMonitorOutput = adaptor.executeCommand(monitorCommand.getRawCommand(), null);
+
+ if (jobMonitorOutput.getExitCode() == 0) {
+ JobStatus jobStatus = jobManagerConfiguration.getParser().parseJobStatus(jobId, jobMonitorOutput.getStdOut());
+ if (jobStatus != null) {
+ logger.info("Job " + jobId + " state is " + jobStatus.getJobState().name());
+ switch (jobStatus.getJobState()) {
+ case COMPLETE:
+ case CANCELED:
+ case SUSPENDED:
+ case FAILED:
+ // if the job already is in above states, there is no use of trying cancellation
+ // setting context variable to be used in the Cancel Completing Task
+ setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, "true");
+ return onSuccess("Job already is in a saturated state");
+ }
+ } else {
+ logger.warn("Job status for job " + jobId + " is null. Std out " + jobMonitorOutput.getStdOut() +
+ ". Std err " + jobMonitorOutput.getStdError() + ". Job monitor command " + monitorCommand.getRawCommand());
+ }
+ } else {
+ logger.warn("Error while fetching the job " + jobId + " status. Std out " + jobMonitorOutput.getStdOut() +
+ ". Std err " + jobMonitorOutput.getStdError() + ". Job monitor command " + monitorCommand.getRawCommand());
+ }
+ } catch (Exception e) {
+ logger.error("Unknown error while fetching the job status but continuing..", e);
+ }
+
+ try {
logger.info("Cancelling job " + jobId + " of process " + getProcessId());
RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
logger.info("Command to cancel the job " + jobId + " : " + cancelCommand.getRawCommand());
- AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
- getTaskContext().getGatewayId(),
- getTaskContext().getComputeResourceId(),
- getTaskContext().getJobSubmissionProtocol(),
- getTaskContext().getComputeResourceCredentialToken(),
- getTaskContext().getComputeResourceLoginUserName());
+
logger.info("Running cancel command on compute host");
- CommandOutput commandOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null);
+ CommandOutput jobCancelOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null);
- if (commandOutput.getExitCode() != 0) {
+ if (jobCancelOutput.getExitCode() != 0) {
logger.error("Failed to execute job cancellation command for job " + jobId + " Sout : " +
- commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError());
+ jobCancelOutput.getStdOut() + ", Serr : " + jobCancelOutput.getStdError());
return onFail("Failed to execute job cancellation command for job " + jobId + " Sout : " +
- commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError(), true, null);
+ jobCancelOutput.getStdOut() + ", Serr : " + jobCancelOutput.getStdError(), true, null);
}
} catch (Exception ex) {
logger.error("Unknown error while canceling job " + jobId + " of process " + getProcessId());
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index c8a3f8d..7c85d31 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -28,6 +28,8 @@ import org.apache.airavata.helix.impl.task.*;
import org.apache.airavata.helix.impl.task.completing.CompletingTask;
import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.monitor.JobStateValidator;
import org.apache.airavata.monitor.JobStatusResult;
import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
@@ -196,8 +198,21 @@ public class PostWorkflowManager extends WorkflowManager {
// TODO get cluster lock before that
if ("cancel".equals(processStatus)) {
- logger.info("Cancelled post workflow for process " + processId);
- // TODO to be implemented
+ logger.info("Cancelled post workflow for process " + processId + " in experiment " + experimentId);
+ // This will mark an cancelling Experiment into a cancelled status for a set of valid job statuses
+ switch (jobStatusResult.getState()) {
+ case FAILED:
+ case SUSPENDED:
+ case CANCELED:
+ case COMPLETE:
+ logger.info("Job " + jobStatusResult.getJobId() + " status is " + jobStatusResult.getState() +
+ " so marking experiment " + experimentId + " as cancelled" );
+ publishProcessStatus(processId, experimentId, gateway, ProcessState.CANCELED);
+ break;
+ default:
+ logger.warn("Job " + jobStatusResult.getJobId() + " status " + jobStatusResult.getState() +
+ " is invalid to mark experiment " + experimentId + " as cancelled");
+ }
} else {
if (jobStatusResult.getState() == JobState.COMPLETE || jobStatusResult.getState() == JobState.FAILED) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 78aa1a6..dce98c3 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -253,20 +253,21 @@ public class PreWorkflowManager extends WorkflowManager {
}
String processId = event.getProcessId();
+ String experimentId = event.getExperimentId();
String gateway = event.getGatewayId();
- logger.info("Received process launch message for process " + processId + " in gateway " + gateway);
+ logger.info("Received process launch message for process " + processId + " of experiment " + experimentId + " in gateway " + gateway);
try {
- logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway);
+ logger.info("Launching the pre workflow for process " + processId + " of experiment " + experimentId + " in gateway " + gateway);
String workflowName = createAndLaunchPreWorkflow(processId);
- logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
+ logger.info("Completed launching the pre workflow " + workflowName + " for process" + processId + " of experiment " + experimentId + " in gateway " + gateway);
// updating the process status
ProcessStatus status = new ProcessStatus();
status.setState(ProcessState.STARTED);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- publishProcessStatus(event, status);
+ publishProcessStatus(processId, experimentId, gateway, ProcessState.STARTED);
subscriber.sendAck(messageContext.getDeliveryTag());
} catch (Exception e) {
logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
@@ -305,25 +306,4 @@ public class PreWorkflowManager extends WorkflowManager {
}
}
}
-
- private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException {
-
- RegistryService.Client registryClient = getRegistryClientPool().getResource();
-
- try {
- registryClient.updateProcessStatus(status, event.getProcessId());
- getRegistryClientPool().returnResource(registryClient);
-
- } catch (Exception e) {
- logger.error("Failed to update process status " + event.getProcessId(), e);
- getRegistryClientPool().returnBrokenResource(registryClient);
- }
-
- ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(), event.getExperimentId(), event.getGatewayId());
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), event.getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
- }
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index c8fe1be..7413ad5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -2,12 +2,19 @@ package org.apache.airavata.helix.impl.workflow;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.helix.workflow.WorkflowOperator;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.curator.RetryPolicy;
@@ -17,6 +24,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Calendar;
+
public class WorkflowManager {
private final static Logger logger = LoggerFactory.getLogger(WorkflowManager.class);
@@ -86,4 +95,30 @@ public class WorkflowManager {
public ThriftClientPool<RegistryService.Client> getRegistryClientPool() {
return registryClientPool;
}
+
+ public void publishProcessStatus(String processId, String experimentId, String gatewayId, ProcessState state)
+ throws AiravataException {
+
+ ProcessStatus status = new ProcessStatus();
+ status.setState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+
+ RegistryService.Client registryClient = getRegistryClientPool().getResource();
+
+ try {
+ registryClient.updateProcessStatus(status, processId);
+ getRegistryClientPool().returnResource(registryClient);
+
+ } catch (Exception e) {
+ logger.error("Failed to update process status " + processId, e);
+ getRegistryClientPool().returnBrokenResource(registryClient);
+ }
+
+ ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ }
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.