You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/23 19:10:24 UTC
[airavata] branch develop updated: Implementing workflow level
cancellation
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 1e090ff Implementing workflow level cancellation
1e090ff is described below
commit 1e090ffda8521b299cdbf512be061888171cf83a
Author: dimuthu <di...@gmail.com>
AuthorDate: Fri Mar 23 15:10:17 2018 -0400
Implementing workflow level cancellation
---
.../helix/impl/participant/GlobalParticipant.java | 3 +-
.../airavata/helix/impl/task/AiravataTask.java | 14 +--
.../task/cancel/RemoteJobCancellationTask.java | 20 +++++
.../impl/task/cancel/WorkflowCancellationTask.java | 75 ++++++++++++++++
.../impl/task/submission/JobSubmissionTask.java | 3 +-
.../helix/impl/workflow/PostWorkflowManager.java | 91 ++++++++++++++-----
.../helix/impl/workflow/PreWorkflowManager.java | 100 +++++++++++++++++++--
.../apache/airavata/helix/core/AbstractTask.java | 27 ++++++
.../apache/airavata/helix/core/util/TaskUtil.java | 5 +-
.../messaging/core/impl/ProcessConsumer.java | 14 +++
.../airavata/orchestrator/cpi/Orchestrator.java | 3 +-
.../cpi/impl/SimpleOrchestratorImpl.java | 31 ++++---
.../server/OrchestratorServerHandler.java | 20 +++++
13 files changed, 347 insertions(+), 59 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index fdca3fb..f0a5c23 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -43,7 +43,8 @@ public class GlobalParticipant extends HelixParticipant {
"org.apache.airavata.helix.impl.task.submission.ForkJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.LocalJobSubmissionTask",
- "org.apache.airavata.helix.impl.task.staging.ArchiveTask"
+ "org.apache.airavata.helix.impl.task.staging.ArchiveTask",
+ "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask"
};
public Map<String, TaskFactory> getTaskFactory() {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index a5e7e02..01a7c15 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -75,16 +75,11 @@ public abstract class AiravataTask extends AbstractTask {
@TaskParam(name = "Skip Status Publish")
private boolean skipTaskStatusPublish = false;
- @TaskOutPort(name = "Next Task")
- private OutPort nextTask;
-
protected TaskResult onSuccess(String message) {
if (!skipTaskStatusPublish) {
publishTaskState(TaskState.COMPLETED);
}
- String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
- logger.info(successMessage);
- return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+ return super.onSuccess(message);
}
protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
@@ -120,7 +115,8 @@ public abstract class AiravataTask extends AbstractTask {
saveProcessError(errorModel);
saveTaskError(errorModel);
}
- return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+
+ return onFail(errorMessage, fatal);
}
protected void saveAndPublishProcessStatus(ProcessState state) {
@@ -398,10 +394,6 @@ public abstract class AiravataTask extends AbstractTask {
return processModel;
}
- public void setNextTask(OutPort nextTask) {
- this.nextTask = nextTask;
- }
-
public void setSkipTaskStatusPublish(boolean skipTaskStatusPublish) {
this.skipTaskStatusPublish = skipTaskStatusPublish;
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
new file mode 100644
index 0000000..2cfb317
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -0,0 +1,20 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Remote Job Cancellation Task")
+public class RemoteJobCancellationTask extends AbstractTask {
+
+ @Override
+ public TaskResult onRun(TaskHelper helper) {
+ return null;
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
new file mode 100644
index 0000000..3d1b03d
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -0,0 +1,75 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Workflow Cancellation Task")
+public class WorkflowCancellationTask extends AbstractTask {
+
+ private static final Logger logger = LogManager.getLogger(WorkflowCancellationTask.class);
+
+ private TaskDriver taskDriver;
+
+ @TaskParam(name = "Cancelling Workflow")
+ private String cancellingWorkflowName;
+
+ @Override
+ public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+ super.init(manager, workflowName, jobName, taskName);
+
+ try {
+
+ HelixManager helixManager = HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"), taskName,
+ InstanceType.SPECTATOR, ServerSettings.getZookeeperConnection());
+ helixManager.connect();
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ helixManager.disconnect();
+ }
+ }
+ );
+ taskDriver = new TaskDriver(helixManager);
+ } catch (Exception e) {
+ logger.error("Failed to build Helix Task driver in " + taskName, e);
+ throw new RuntimeException("Failed to build Helix Task driver in " + taskName, e);
+ }
+ }
+
+ @Override
+ public TaskResult onRun(TaskHelper helper) {
+ logger.info("Cancelling workflow " + cancellingWorkflowName);
+ try {
+ taskDriver.stop(cancellingWorkflowName);
+ logger.info("Workflow " + cancellingWorkflowName + " cancelled");
+ return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
+ } catch (Exception e) {
+ logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
+ return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), true);
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+
+ public String getCancellingWorkflowName() {
+ return cancellingWorkflowName;
+ }
+
+ public void setCancellingWorkflowName(String cancellingWorkflowName) {
+ this.cancellingWorkflowName = cancellingWorkflowName;
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 0f7799d..859e666 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -65,7 +65,6 @@ public abstract class JobSubmissionTask extends AiravataTask {
this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
this.curatorClient.start();
} catch (ApplicationSettingsException e) {
- e.printStackTrace();
logger.error("Failed to create curator client ", e);
throw new RuntimeException(e);
}
@@ -93,6 +92,8 @@ public abstract class JobSubmissionTask extends AiravataTask {
"/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
"/monitoring/" + jobId + "/status", "pending".getBytes());
+ getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/registry/" + getProcessId() + "/jobs/" + jobId, new byte[0]);
}
@SuppressWarnings("WeakerAccess")
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index f01c2f5..5c0df31 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -60,6 +60,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.*;
@@ -90,29 +91,59 @@ public class PostWorkflowManager {
return consumer;
}
+ private void registerWorkflow(String processId, String workflowId) throws Exception {
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
+ }
+
private String getExperimentIdByJobId(String jobId) throws Exception {
- byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment");
- return new String(processBytes);
+ String path = "/monitoring/" + jobId + "/experiment";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ byte[] processBytes = this.curatorClient.getData().forPath(path);
+ return new String(processBytes);
+ } else {
+ return null;
+ }
}
private String getTaskIdByJobId(String jobId) throws Exception {
- byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task");
- return new String(processBytes);
+ String path = "/monitoring/" + jobId + "/task";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ byte[] processBytes = this.curatorClient.getData().forPath(path);
+ return new String(processBytes);
+ } else {
+ return null;
+ }
}
private String getProcessIdByJobId(String jobId) throws Exception {
- byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
- return new String(processBytes);
+ String path = "/monitoring/" + jobId + "/process";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ byte[] processBytes = this.curatorClient.getData().forPath(path);
+ return new String(processBytes);
+ } else {
+ return null;
+ }
}
private String getGatewayByJobId(String jobId) throws Exception {
- byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway");
- return new String(gatewayBytes);
+ String path = "/monitoring/" + jobId + "/gateway";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ byte[] gatewayBytes = this.curatorClient.getData().forPath(path);
+ return new String(gatewayBytes);
+ } else {
+ return null;
+ }
}
- private String getStatusByJobId(String jobId) throws Exception {
- byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status");
- return new String(statusBytes);
+ private String getStatusByProcess(String processId) throws Exception {
+ String path = "/registry/" + processId + "/status";
+ if (this.curatorClient.checkExists().forPath(path) != null) {
+ byte[] statusBytes = this.curatorClient.getData().forPath(path);
+ return new String(statusBytes);
+ } else {
+ return null;
+ }
}
private boolean hasMonitoringRegistered(String jobId) throws Exception {
@@ -130,23 +161,36 @@ public class PostWorkflowManager {
logger.info("Processing job result " + jobStatusResult.getJobId());
if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
- String gateway = getGatewayByJobId(jobStatusResult.getJobId());
- String processId = getProcessIdByJobId(jobStatusResult.getJobId());
- String experimentId = getExperimentIdByJobId(jobStatusResult.getJobId());
- String task = getTaskIdByJobId(jobStatusResult.getJobId());
- String status = getStatusByJobId(jobStatusResult.getJobId());
+ String gateway = Optional.ofNullable(getGatewayByJobId(jobStatusResult.getJobId()))
+ .orElseThrow(() -> new Exception("Can not find the gateway for job id " + jobStatusResult.getJobId()));
+
+ String processId = Optional.ofNullable(getProcessIdByJobId(jobStatusResult.getJobId()))
+ .orElseThrow(() -> new Exception("Can not find the process for job id " + jobStatusResult.getJobId()));
- logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
- + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+ String experimentId = Optional.ofNullable(getExperimentIdByJobId(jobStatusResult.getJobId()))
+ .orElseThrow(() -> new Exception("Can not find the experiment for job id " + jobStatusResult.getJobId()));
+
+ String task = Optional.ofNullable(getTaskIdByJobId(jobStatusResult.getJobId()))
+ .orElseThrow(() -> new Exception("Can not find the task for job id " + jobStatusResult.getJobId()));
+
+ String status = getStatusByProcess(processId);
// TODO get cluster lock before that
- if ("cancelled".equals(status)) {
+ if ("cancel".equals(status)) {
+ logger.info("Cancelled post workflow for process " + processId);
// TODO to be implemented
} else {
+ logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
+ + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
if (jobStatusResult.getState() == JobState.COMPLETE) {
+
+ logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
+ + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
logger.info("Job " + jobStatusResult.getJobId() + " was completed");
ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
@@ -211,9 +255,14 @@ public class PostWorkflowManager {
ServerSettings.getSetting("post.workflow.manager.name"),
ServerSettings.getZookeeperConnection());
- workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
+ String workflowName = workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
-
+ try {
+ registerWorkflow(processId, workflowName);
+ } catch (Exception e) {
+ logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
+ "This will affect cancellation tasks", e);
+ }
} else if (jobStatusResult.getState() == JobState.CANCELED) {
logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index ff5f9ea..ff570ae 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -20,11 +20,13 @@
package org.apache.airavata.helix.impl.workflow;
import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
@@ -33,16 +35,22 @@ import org.apache.airavata.messaging.core.*;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
+import org.apache.zookeeper.CreateMode;
import java.util.ArrayList;
import java.util.List;
@@ -54,13 +62,36 @@ public class PreWorkflowManager {
private static final Logger logger = LogManager.getLogger(PreWorkflowManager.class);
- private final Subscriber subscriber;
+ private Subscriber subscriber;
+ private CuratorFramework curatorClient = null;
@SuppressWarnings("WeakerAccess")
public PreWorkflowManager() throws AiravataException {
+ init();
+ }
+
+ private void init() throws AiravataException {
List<String> routingKeys = new ArrayList<>();
routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+ this.curatorClient.start();
+ }
+
+ private void registerWorkflow(String processId, String workflowId) throws Exception {
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
+ }
+
+ private void registerCancelProcess(String processId) throws Exception {
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ "/registry/" + processId + "/status" , "cancel".getBytes());
+ }
+
+ private List<String> getWorkflowsOfProcess(String processId) throws Exception {
+ return this.curatorClient.getChildren().forPath("/registry/" + processId + "/workflows");
}
private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
@@ -90,9 +121,7 @@ public class PreWorkflowManager {
airavataTask.setRetryCount(1);
jobSubmissionFound = true;
} else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
- if (jobSubmissionFound) {
- //airavataTask = new OutputDataStagingTask();
- } else {
+ if (!jobSubmissionFound) {
airavataTask = new InputDataStagingTask();
}
}
@@ -116,9 +145,42 @@ public class PreWorkflowManager {
ServerSettings.getZookeeperConnection());
String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
+ try {
+ registerWorkflow(processId, workflowName);
+ } catch (Exception e) {
+ logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
+ "This will affect cancellation tasks", e);
+ }
return workflowName;
}
+ private String createAndLaunchCancelWorkflow(String processId, String gateway) throws Exception {
+ registerCancelProcess(processId);
+ List<String> workflows = getWorkflowsOfProcess(processId);
+ final List<AbstractTask> allTasks = new ArrayList<>();
+ if (workflows != null && workflows.size() > 0) {
+ for (String wf : workflows) {
+ logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
+ WorkflowCancellationTask wfct = new WorkflowCancellationTask();
+ wfct.setTaskId(UUID.randomUUID().toString());
+ wfct.setCancellingWorkflowName(wf);
+ allTasks.add(wfct);
+ }
+
+ WorkflowManager workflowManager = new WorkflowManager(
+ ServerSettings.getSetting("helix.cluster.name"),
+ ServerSettings.getSetting("post.workflow.manager.name"),
+ ServerSettings.getZookeeperConnection());
+
+ String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
+ logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
+ return workflow;
+ } else {
+ logger.info("No workflow registered with process " + processId + " to cancel");
+ return null;
+ }
+ }
+
public static void main(String[] args) throws Exception {
PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
}
@@ -147,14 +209,40 @@ public class PreWorkflowManager {
logger.info("Received process launch message for process " + processId + " in gateway " + gateway);
try {
- logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway );
+ logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway);
String workflowName = createAndLaunchPreWorkflow(processId, gateway);
- logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway );
+ logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
subscriber.sendAck(messageContext.getDeliveryTag());
} catch (Exception e) {
logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
//subscriber.sendAck(messageContext.getDeliveryTag());
}
+ } else if (messageContext.getType().equals(MessageType.TERMINATEPROCESS)) {
+ ProcessTerminateEvent event = new ProcessTerminateEvent();
+ TBase messageEvent = messageContext.getEvent();
+
+ try {
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ } catch (TException e) {
+ logger.error("Failed to fetch process cancellation event", e);
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ }
+
+ String processId = event.getProcessId();
+ String gateway = event.getGatewayId();
+
+ logger.info("Received process cancel message for process " + processId + " in gateway " + gateway);
+
+ try {
+ logger.info("Launching the process cancel workflow for process " + processId + " in gateway " + gateway);
+ String workflowName = createAndLaunchCancelWorkflow(processId, gateway);
+ logger.info("Completed process cancel workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ } catch (Exception e) {
+ logger.error("Failed to launch process cancel workflow for process " + processId + " in gateway " + gateway, e);
+ //subscriber.sendAck(messageContext.getDeliveryTag());
+ }
} else {
logger.warn("Unknown message type");
subscriber.sendAck(messageContext.getDeliveryTag());
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index bae36e6..2c4b5a8 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -21,12 +21,16 @@ package org.apache.airavata.helix.core;
import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.model.status.TaskState;
import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
* TODO: Class level comments please
@@ -36,12 +40,17 @@ import org.apache.helix.task.UserContentStore;
*/
public abstract class AbstractTask extends UserContentStore implements Task {
+ private static final Logger logger = LogManager.getLogger(AbstractTask.class);
+
private static final String NEXT_JOB = "next-job";
private static final String WORKFLOW_STARTED = "workflow-started";
@TaskParam(name = "taskId")
private String taskId;
+ @TaskOutPort(name = "Next Task")
+ private OutPort nextTask;
+
private TaskCallbackContext callbackContext;
private TaskHelper taskHelper;
@@ -71,6 +80,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
@Override
public final void cancel() {
+ logger.info("Cancelling task " + taskId);
onCancel();
}
@@ -78,6 +88,15 @@ public abstract class AbstractTask extends UserContentStore implements Task {
public abstract void onCancel();
+ protected TaskResult onSuccess(String message) {
+ String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
+ logger.info(successMessage);
+ return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+ }
+
+ protected TaskResult onFail(String reason, boolean fatal) {
+ return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, reason);
+ }
protected void publishErrors(Throwable e) {
// TODO Publish through kafka channel with task and workflow id
e.printStackTrace();
@@ -134,4 +153,12 @@ public abstract class AbstractTask extends UserContentStore implements Task {
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
+
+ public OutPort getNextTask() {
+ return nextTask;
+ }
+
+ public void setNextTask(OutPort nextTask) {
+ this.nextTask = nextTask;
+ }
}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index f58b365..4514f3a 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -25,10 +25,7 @@ import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* TODO: Class level comments please
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index 8cf9874..f6f1127 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -32,6 +32,7 @@ import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.model.messaging.event.Message;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -79,6 +80,19 @@ public class ProcessConsumer extends QueueingConsumer{
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
messageContext.setIsRedeliver(envelope.isRedeliver());
handler.onMessage(messageContext);
+ } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+ ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
+ log.info(" Message Received with message id '" + message.getMessageId()
+ + " and with message type:" + message.getMessageType() + ", for processId:" +
+ processTerminateEvent.getProcessId());
+ event = processTerminateEvent;
+ gatewayId = processTerminateEvent.getGatewayId();
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+ message.getMessageId(), gatewayId, deliveryTag);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ messageContext.setIsRedeliver(envelope.isRedeliver());
+ handler.onMessage(messageContext);
} else {
log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
"delivery tag {} ", message.getMessageType().name(), deliveryTag);
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index 8277387..9aa788e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -71,11 +71,10 @@ public interface Orchestrator {
* We just have to give the experimentID
*
* @param experiment
- * @param processModel
* @param tokenId
* @throws OrchestratorException
*/
- void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId) throws OrchestratorException;
+ void cancelExperiment(ExperimentModel experiment, String tokenId) throws OrchestratorException;
//todo have to add another method to handle failed or jobs to be recovered by orchestrator
//todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ff4a5e6..53f2a94 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -239,19 +239,24 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
- public void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId)
- throws OrchestratorException {
- // FIXME
-// List<JobDetails> jobDetailsList = task.getJobDetailsList();
-// for(JobDetails jobDetails:jobDetailsList) {
-// JobState jobState = jobDetails.getJobStatuses().getJobState();
-// if (jobState.getValue() > 4){
-// logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() +
-// "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName());
-// return;
-// }
-// }
-// jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId);
+ public void cancelExperiment(ExperimentModel experiment, String tokenId) throws OrchestratorException {
+ logger.info("Terminating experiment " + experiment.getExperimentId());
+ RegistryService.Client registryServiceClient = getRegistryServiceClient();
+
+ try {
+ List<String> processIds = registryServiceClient.getProcessIds(experiment.getExperimentId());
+ if (processIds != null && processIds.size() > 0) {
+ for (String processId : processIds) {
+ logger.info("Terminating process " + processId + " of experiment " + experiment.getExperimentId());
+ jobSubmitter.terminate(experiment.getExperimentId(), processId, tokenId);
+ }
+ } else {
+ logger.warn("No processes found for experiment " + experiment.getExperimentId() + " to cancel");
+ }
+ } catch (TException e) {
+ logger.error("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
+ throw new OrchestratorException("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
+ }
}
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 0934d1a..5fd122b 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -428,6 +428,26 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
log.warn("Experiment termination is only allowed for launched experiments.");
return false;
default:
+ ExperimentModel experimentModel = registryClient.getExperiment(experimentId);
+
+ ComputeResourcePreference computeResourcePreference = registryClient.getGatewayComputeResourcePreference
+ (gatewayId,
+ experimentModel.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
+ String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
+ if (token == null || token.isEmpty()){
+ // try with gateway profile level token
+ GatewayResourceProfile gatewayProfile = registryClient.getGatewayResourceProfile(gatewayId);
+ token = gatewayProfile.getCredentialStoreToken();
+ }
+ // still the token is empty, then we fail the experiment
+ if (token == null || token.isEmpty()){
+ log.error("You have not configured credential store token at gateway profile or compute resource preference." +
+ " Please provide the correct token at gateway profile or compute resource preference.");
+ return false;
+ }
+
+ orchestrator.cancelExperiment(experimentModel, token);
+ // TODO deprecate this approach as we are replacing gfac
String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.