You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/23 19:10:24 UTC

[airavata] branch develop updated: Implementing workflow level cancellation

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1e090ff  Implementing workflow level cancellation
1e090ff is described below

commit 1e090ffda8521b299cdbf512be061888171cf83a
Author: dimuthu <di...@gmail.com>
AuthorDate: Fri Mar 23 15:10:17 2018 -0400

    Implementing workflow level cancellation
---
 .../helix/impl/participant/GlobalParticipant.java  |   3 +-
 .../airavata/helix/impl/task/AiravataTask.java     |  14 +--
 .../task/cancel/RemoteJobCancellationTask.java     |  20 +++++
 .../impl/task/cancel/WorkflowCancellationTask.java |  75 ++++++++++++++++
 .../impl/task/submission/JobSubmissionTask.java    |   3 +-
 .../helix/impl/workflow/PostWorkflowManager.java   |  91 ++++++++++++++-----
 .../helix/impl/workflow/PreWorkflowManager.java    | 100 +++++++++++++++++++--
 .../apache/airavata/helix/core/AbstractTask.java   |  27 ++++++
 .../apache/airavata/helix/core/util/TaskUtil.java  |   5 +-
 .../messaging/core/impl/ProcessConsumer.java       |  14 +++
 .../airavata/orchestrator/cpi/Orchestrator.java    |   3 +-
 .../cpi/impl/SimpleOrchestratorImpl.java           |  31 ++++---
 .../server/OrchestratorServerHandler.java          |  20 +++++
 13 files changed, 347 insertions(+), 59 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index fdca3fb..f0a5c23 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -43,7 +43,8 @@ public class GlobalParticipant extends HelixParticipant {
         "org.apache.airavata.helix.impl.task.submission.ForkJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.submission.LocalJobSubmissionTask",
-        "org.apache.airavata.helix.impl.task.staging.ArchiveTask"
+        "org.apache.airavata.helix.impl.task.staging.ArchiveTask",
+        "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask"
     };
 
     public Map<String, TaskFactory> getTaskFactory() {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index a5e7e02..01a7c15 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -75,16 +75,11 @@ public abstract class AiravataTask extends AbstractTask {
     @TaskParam(name = "Skip Status Publish")
     private boolean skipTaskStatusPublish = false;
 
-    @TaskOutPort(name = "Next Task")
-    private OutPort nextTask;
-
     protected TaskResult onSuccess(String message) {
         if (!skipTaskStatusPublish) {
             publishTaskState(TaskState.COMPLETED);
         }
-        String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
-        logger.info(successMessage);
-        return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+        return super.onSuccess(message);
     }
 
     protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
@@ -120,7 +115,8 @@ public abstract class AiravataTask extends AbstractTask {
             saveProcessError(errorModel);
             saveTaskError(errorModel);
         }
-        return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+
+        return onFail(errorMessage, fatal);
     }
 
     protected void saveAndPublishProcessStatus(ProcessState state) {
@@ -398,10 +394,6 @@ public abstract class AiravataTask extends AbstractTask {
         return processModel;
     }
 
-    public void setNextTask(OutPort nextTask) {
-        this.nextTask = nextTask;
-    }
-
     public void setSkipTaskStatusPublish(boolean skipTaskStatusPublish) {
         this.skipTaskStatusPublish = skipTaskStatusPublish;
     }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
new file mode 100644
index 0000000..2cfb317
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -0,0 +1,20 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Remote Job Cancellation Task")
+public class RemoteJobCancellationTask extends AbstractTask {
+
+    @Override
+    public TaskResult onRun(TaskHelper helper) {
+        return null;
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
new file mode 100644
index 0000000..3d1b03d
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -0,0 +1,75 @@
+package org.apache.airavata.helix.impl.task.cancel;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Workflow Cancellation Task")
+public class WorkflowCancellationTask extends AbstractTask {
+
+    private static final Logger logger = LogManager.getLogger(WorkflowCancellationTask.class);
+
+    private TaskDriver taskDriver;
+
+    @TaskParam(name = "Cancelling Workflow")
+    private String cancellingWorkflowName;
+
+    @Override
+    public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+        super.init(manager, workflowName, jobName, taskName);
+
+        try {
+
+            HelixManager helixManager = HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"), taskName,
+                    InstanceType.SPECTATOR, ServerSettings.getZookeeperConnection());
+            helixManager.connect();
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            helixManager.disconnect();
+                        }
+                    }
+            );
+            taskDriver = new TaskDriver(helixManager);
+        } catch (Exception e) {
+            logger.error("Failed to build Helix Task driver in " + taskName, e);
+            throw new RuntimeException("Failed to build Helix Task driver in " + taskName, e);
+        }
+    }
+
+    @Override
+    public TaskResult onRun(TaskHelper helper) {
+        logger.info("Cancelling workflow " + cancellingWorkflowName);
+        try {
+            taskDriver.stop(cancellingWorkflowName);
+            logger.info("Workflow " + cancellingWorkflowName + " cancelled");
+            return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
+        } catch (Exception e) {
+            logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
+            return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), true);
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+
+    public String getCancellingWorkflowName() {
+        return cancellingWorkflowName;
+    }
+
+    public void setCancellingWorkflowName(String cancellingWorkflowName) {
+        this.cancellingWorkflowName = cancellingWorkflowName;
+    }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 0f7799d..859e666 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -65,7 +65,6 @@ public abstract class JobSubmissionTask extends AiravataTask {
             this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
             this.curatorClient.start();
         } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
             logger.error("Failed to create curator client ", e);
             throw new RuntimeException(e);
         }
@@ -93,6 +92,8 @@ public abstract class JobSubmissionTask extends AiravataTask {
                 "/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
         getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
                 "/monitoring/" + jobId + "/status", "pending".getBytes());
+        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                "/registry/" + getProcessId() + "/jobs/" + jobId, new byte[0]);
     }
 
     @SuppressWarnings("WeakerAccess")
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index f01c2f5..5c0df31 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -60,6 +60,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 
 import java.util.*;
@@ -90,29 +91,59 @@ public class PostWorkflowManager {
         return consumer;
     }
 
+    private void registerWorkflow(String processId, String workflowId) throws Exception {
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
+    }
+
     private String getExperimentIdByJobId(String jobId) throws Exception {
-        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment");
-        return new String(processBytes);
+        String path = "/monitoring/" + jobId + "/experiment";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = this.curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
     }
 
     private String getTaskIdByJobId(String jobId) throws Exception {
-        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task");
-        return new String(processBytes);
+        String path = "/monitoring/" + jobId + "/task";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = this.curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
     }
 
     private String getProcessIdByJobId(String jobId) throws Exception {
-        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
-        return new String(processBytes);
+        String path = "/monitoring/" + jobId + "/process";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = this.curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
     }
 
     private String getGatewayByJobId(String jobId) throws Exception {
-        byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway");
-        return new String(gatewayBytes);
+        String path = "/monitoring/" + jobId + "/gateway";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            byte[] gatewayBytes = this.curatorClient.getData().forPath(path);
+            return new String(gatewayBytes);
+        } else {
+            return null;
+        }
     }
 
-    private String getStatusByJobId(String jobId) throws Exception {
-        byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status");
-        return new String(statusBytes);
+    private String getStatusByProcess(String processId) throws Exception {
+        String path = "/registry/" + processId + "/status";
+        if (this.curatorClient.checkExists().forPath(path) != null) {
+            byte[] statusBytes = this.curatorClient.getData().forPath(path);
+            return new String(statusBytes);
+        } else {
+            return null;
+        }
     }
 
     private boolean hasMonitoringRegistered(String jobId) throws Exception {
@@ -130,23 +161,36 @@ public class PostWorkflowManager {
             logger.info("Processing job result " + jobStatusResult.getJobId());
 
             if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
-                String gateway = getGatewayByJobId(jobStatusResult.getJobId());
-                String processId = getProcessIdByJobId(jobStatusResult.getJobId());
-                String experimentId = getExperimentIdByJobId(jobStatusResult.getJobId());
-                String task = getTaskIdByJobId(jobStatusResult.getJobId());
-                String status = getStatusByJobId(jobStatusResult.getJobId());
+                String gateway = Optional.ofNullable(getGatewayByJobId(jobStatusResult.getJobId()))
+                        .orElseThrow(() -> new Exception("Can not find the gateway for job id " + jobStatusResult.getJobId()));
+
+                String processId = Optional.ofNullable(getProcessIdByJobId(jobStatusResult.getJobId()))
+                        .orElseThrow(() -> new Exception("Can not find the process for job id " + jobStatusResult.getJobId()));
 
-                logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
-                        + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+                String experimentId = Optional.ofNullable(getExperimentIdByJobId(jobStatusResult.getJobId()))
+                        .orElseThrow(() -> new Exception("Can not find the experiment for job id " + jobStatusResult.getJobId()));
+
+                String task = Optional.ofNullable(getTaskIdByJobId(jobStatusResult.getJobId()))
+                        .orElseThrow(() -> new Exception("Can not find the task for job id " + jobStatusResult.getJobId()));
+
+                String status = getStatusByProcess(processId);
 
                 // TODO get cluster lock before that
-                if ("cancelled".equals(status)) {
+                if ("cancel".equals(status)) {
+                    logger.info("Cancelled post workflow for process " + processId);
                     // TODO to be implemented
                 } else {
 
+                    logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
+                            + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
                     saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
 
                     if (jobStatusResult.getState() == JobState.COMPLETE) {
+
+                        logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
+                                + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
                         logger.info("Job " + jobStatusResult.getJobId() + " was completed");
 
                         ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
@@ -211,9 +255,14 @@ public class PostWorkflowManager {
                                 ServerSettings.getSetting("post.workflow.manager.name"),
                                 ServerSettings.getZookeeperConnection());
 
-                        workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
+                        String workflowName = workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
                                 new ArrayList<>(allTasks), true, false);
-
+                        try {
+                            registerWorkflow(processId, workflowName);
+                        } catch (Exception e) {
+                            logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
+                                    "This will affect cancellation tasks", e);
+                        }
                     } else if (jobStatusResult.getState() == JobState.CANCELED) {
                         logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index ff5f9ea..ff570ae 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -20,11 +20,13 @@
 package org.apache.airavata.helix.impl.workflow;
 
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
 import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
 import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
 import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
@@ -33,16 +35,22 @@ import org.apache.airavata.messaging.core.*;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.CreateMode;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -54,13 +62,36 @@ public class PreWorkflowManager {
 
     private static final Logger logger = LogManager.getLogger(PreWorkflowManager.class);
 
-    private final Subscriber subscriber;
+    private Subscriber subscriber;
+    private CuratorFramework curatorClient = null;
 
     @SuppressWarnings("WeakerAccess")
     public PreWorkflowManager() throws AiravataException {
+        init();
+    }
+
+    private void init() throws AiravataException {
         List<String> routingKeys = new ArrayList<>();
         routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
         this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+        this.curatorClient.start();
+    }
+
+    private void registerWorkflow(String processId, String workflowId) throws Exception {
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
+    }
+
+    private void registerCancelProcess(String processId) throws Exception {
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                "/registry/" + processId + "/status" , "cancel".getBytes());
+    }
+
+    private List<String> getWorkflowsOfProcess(String processId) throws Exception {
+        return this.curatorClient.getChildren().forPath("/registry/" + processId + "/workflows");
     }
 
     private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
@@ -90,9 +121,7 @@ public class PreWorkflowManager {
                     airavataTask.setRetryCount(1);
                     jobSubmissionFound = true;
                 } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
-                    if (jobSubmissionFound) {
-                        //airavataTask = new OutputDataStagingTask();
-                    } else {
+                    if (!jobSubmissionFound) {
                         airavataTask = new InputDataStagingTask();
                     }
                 }
@@ -116,9 +145,42 @@ public class PreWorkflowManager {
                 ServerSettings.getZookeeperConnection());
         String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
                 new ArrayList<>(allTasks), true, false);
+        try {
+            registerWorkflow(processId, workflowName);
+        } catch (Exception e) {
+            logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
+                    "This will affect cancellation tasks", e);
+        }
         return workflowName;
     }
 
+    private String createAndLaunchCancelWorkflow(String processId, String gateway) throws Exception {
+        registerCancelProcess(processId);
+        List<String> workflows = getWorkflowsOfProcess(processId);
+        final List<AbstractTask> allTasks = new ArrayList<>();
+        if (workflows != null && workflows.size() > 0) {
+            for (String wf : workflows) {
+                logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
+                WorkflowCancellationTask wfct = new WorkflowCancellationTask();
+                wfct.setTaskId(UUID.randomUUID().toString());
+                wfct.setCancellingWorkflowName(wf);
+                allTasks.add(wfct);
+            }
+
+            WorkflowManager workflowManager = new WorkflowManager(
+                    ServerSettings.getSetting("helix.cluster.name"),
+                    ServerSettings.getSetting("post.workflow.manager.name"),
+                    ServerSettings.getZookeeperConnection());
+
+            String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
+            logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
+            return workflow;
+        } else {
+            logger.info("No workflow registered with process " + processId + " to cancel");
+            return null;
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
     }
@@ -147,14 +209,40 @@ public class PreWorkflowManager {
                 logger.info("Received process launch message for process " + processId + " in gateway " + gateway);
 
                 try {
-                    logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway );
+                    logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway);
                     String workflowName = createAndLaunchPreWorkflow(processId, gateway);
-                    logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway );
+                    logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
                     subscriber.sendAck(messageContext.getDeliveryTag());
                 } catch (Exception e) {
                     logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
                     //subscriber.sendAck(messageContext.getDeliveryTag());
                 }
+            } else if (messageContext.getType().equals(MessageType.TERMINATEPROCESS)) {
+                ProcessTerminateEvent event = new ProcessTerminateEvent();
+                TBase messageEvent = messageContext.getEvent();
+
+                try {
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                } catch (TException e) {
+                    logger.error("Failed to fetch process cancellation event", e);
+                    subscriber.sendAck(messageContext.getDeliveryTag());
+                }
+
+                String processId = event.getProcessId();
+                String gateway = event.getGatewayId();
+
+                logger.info("Received process cancel message for process " + processId + " in gateway " + gateway);
+
+                try {
+                    logger.info("Launching the process cancel workflow for process " + processId + " in gateway " + gateway);
+                    String workflowName = createAndLaunchCancelWorkflow(processId, gateway);
+                    logger.info("Completed process cancel workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
+                    subscriber.sendAck(messageContext.getDeliveryTag());
+                } catch (Exception e) {
+                    logger.error("Failed to launch process cancel workflow for process " + processId + " in gateway " + gateway, e);
+                    //subscriber.sendAck(messageContext.getDeliveryTag());
+                }
             } else {
                 logger.warn("Unknown message type");
                 subscriber.sendAck(messageContext.getDeliveryTag());
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index bae36e6..2c4b5a8 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -21,12 +21,16 @@ package org.apache.airavata.helix.core;
 
 import org.apache.airavata.helix.core.util.TaskUtil;
 import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.model.status.TaskState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  * TODO: Class level comments please
@@ -36,12 +40,17 @@ import org.apache.helix.task.UserContentStore;
  */
 public abstract class AbstractTask extends UserContentStore implements Task {
 
+    private static final Logger logger = LogManager.getLogger(AbstractTask.class);
+
     private static final String NEXT_JOB = "next-job";
     private static final String WORKFLOW_STARTED = "workflow-started";
 
     @TaskParam(name = "taskId")
     private String taskId;
 
+    @TaskOutPort(name = "Next Task")
+    private OutPort nextTask;
+
     private TaskCallbackContext callbackContext;
     private TaskHelper taskHelper;
 
@@ -71,6 +80,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
 
     @Override
     public final void cancel() {
+        logger.info("Cancelling task " + taskId);
         onCancel();
     }
 
@@ -78,6 +88,15 @@ public abstract class AbstractTask extends UserContentStore implements Task {
 
     public abstract void onCancel();
 
+    protected TaskResult onSuccess(String message) {
+        String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
+        logger.info(successMessage);
+        return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+    }
+
+    protected TaskResult onFail(String reason, boolean fatal) {
+        return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, reason);
+    }
     protected void publishErrors(Throwable e) {
         // TODO Publish through kafka channel with task and workflow id
         e.printStackTrace();
@@ -134,4 +153,12 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     public void setRetryCount(int retryCount) {
         this.retryCount = retryCount;
     }
+
+    public OutPort getNextTask() {
+        return nextTask;
+    }
+
+    public void setNextTask(OutPort nextTask) {
+        this.nextTask = nextTask;
+    }
 }
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index f58b365..4514f3a 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -25,10 +25,7 @@ import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * TODO: Class level comments please
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index 8cf9874..f6f1127 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -32,6 +32,7 @@ import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.model.messaging.event.Message;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -79,6 +80,19 @@ public class ProcessConsumer extends QueueingConsumer{
                 messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                 messageContext.setIsRedeliver(envelope.isRedeliver());
                 handler.onMessage(messageContext);
+            } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+                ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
+                log.info(" Message Received with message id '" + message.getMessageId()
+                        + " and with message type:" + message.getMessageType() + ", for processId:" +
+                        processTerminateEvent.getProcessId());
+                event = processTerminateEvent;
+                gatewayId = processTerminateEvent.getGatewayId();
+                MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+                        message.getMessageId(), gatewayId, deliveryTag);
+                messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                messageContext.setIsRedeliver(envelope.isRedeliver());
+                handler.onMessage(messageContext);
             } else {
                 log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
                         "delivery tag {} ", message.getMessageType().name(), deliveryTag);
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index 8277387..9aa788e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -71,11 +71,10 @@ public interface Orchestrator {
      * We just have to give the experimentID
      *
      * @param experiment
-     * @param processModel
      * @param tokenId
      * @throws OrchestratorException
      */
-    void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId) throws OrchestratorException;
+    void cancelExperiment(ExperimentModel experiment, String tokenId) throws OrchestratorException;
     //todo have to add another method to handle failed or jobs to be recovered by orchestrator
     //todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac
 
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ff4a5e6..53f2a94 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -239,19 +239,24 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
     }
 
 
-    public void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId)
-            throws OrchestratorException {
-        // FIXME
-//        List<JobDetails> jobDetailsList = task.getJobDetailsList();
-//        for(JobDetails jobDetails:jobDetailsList) {
-//            JobState jobState = jobDetails.getJobStatuses().getJobState();
-//            if (jobState.getValue() > 4){
-//                logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() +
-//                "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName());
-//                return;
-//            }
-//        }
-//        jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId);
+    public void cancelExperiment(ExperimentModel experiment, String tokenId) throws OrchestratorException {
+        logger.info("Terminating experiment " + experiment.getExperimentId());
+        RegistryService.Client registryServiceClient = getRegistryServiceClient();
+
+        try {
+            List<String> processIds = registryServiceClient.getProcessIds(experiment.getExperimentId());
+            if (processIds != null && processIds.size() > 0) {
+                for (String processId : processIds) {
+                    logger.info("Terminating process " + processId + " of experiment " + experiment.getExperimentId());
+                    jobSubmitter.terminate(experiment.getExperimentId(), processId, tokenId);
+                }
+            } else {
+                logger.warn("No processes found for experiment " + experiment.getExperimentId() + " to cancel");
+            }
+        } catch (TException e) {
+            logger.error("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
+            throw new OrchestratorException("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
+        }
     }
 
 
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 0934d1a..5fd122b 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -428,6 +428,26 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 				log.warn("Experiment termination is only allowed for launched experiments.");
 				return false;
 			default:
+				ExperimentModel experimentModel = registryClient.getExperiment(experimentId);
+
+                ComputeResourcePreference computeResourcePreference = registryClient.getGatewayComputeResourcePreference
+                        (gatewayId,
+                                experimentModel.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
+                String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
+                if (token == null || token.isEmpty()){
+                    // try with gateway profile level token
+                    GatewayResourceProfile gatewayProfile = registryClient.getGatewayResourceProfile(gatewayId);
+                    token = gatewayProfile.getCredentialStoreToken();
+                }
+                // still the token is empty, then we fail the experiment
+                if (token == null || token.isEmpty()){
+                    log.error("You have not configured credential store token at gateway profile or compute resource preference." +
+                            " Please provide the correct token at gateway profile or compute resource preference.");
+                    return false;
+                }
+
+				orchestrator.cancelExperiment(experimentModel, token);
+				// TODO deprecate this approach as we are replacing gfac
 				String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
 						experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
 				Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.