You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:06 UTC

[airavata] 09/17: Configuring pre workflow manager to read from rabbitmq launch queue

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit ca455645a09da0c8184a741c7ee5cb3853338d70
Author: dimuthu <di...@gmail.com>
AuthorDate: Sun Mar 4 20:48:23 2018 -0500

    Configuring pre workflow manager to read from rabbitmq launch queue
---
 .../airavata/helix/workflow/WorkflowManager.java   | 22 ++++--
 modules/helix-spectator/pom.xml                    |  5 ++
 .../submission/task/DefaultJobSubmissionTask.java  |  2 +-
 .../helix/impl/workflow/PostWorkflowManager.java   | 18 ++---
 .../helix/impl/workflow/PreWorkflowManager.java    | 92 ++++++++++++++++++----
 5 files changed, 102 insertions(+), 37 deletions(-)

diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index 9ecafb9..e3d07b7 100644
--- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -2,13 +2,14 @@ package org.apache.airavata.helix.workflow;
 
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.core.util.*;
 import org.apache.airavata.helix.core.util.TaskUtil;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -22,6 +23,8 @@ import java.util.Map;
  */
 public class WorkflowManager {
 
+    private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
+
     private static final String WORKFLOW_PREFIX = "Workflow_of_process_";
     private TaskDriver taskDriver;
 
@@ -43,9 +46,12 @@ public class WorkflowManager {
         taskDriver = new TaskDriver(helixManager);
     }
 
-    public void launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant) throws Exception {
+    public String launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant, boolean monitor) throws Exception {
+
+        String workflowName = WORKFLOW_PREFIX + processId;
+        logger.info("Launching workflow " + workflowName + " for process " + processId);
 
-        Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_PREFIX + processId).setExpiry(0);
+        Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
 
         for (int i = 0; i < tasks.size(); i++) {
             AbstractTask data = tasks.get(i);
@@ -86,9 +92,13 @@ public class WorkflowManager {
         //TODO : Do we need to monitor workflow status? If so how do we do it in a scalable manner? For example,
         // if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility
 
-        TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
-                TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
-        System.out.println("Workflow finished with state " + taskState.name());
+        if (monitor) {
+            TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
+                    TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+            logger.info("Workflow " + workflowName + " for process " + processId + " finished with state " + taskState.name());
+
+        }
+        return workflowName;
 
     }
 }
\ No newline at end of file
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
index 213f747..326d7ef 100644
--- a/modules/helix-spectator/pom.xml
+++ b/modules/helix-spectator/pom.xml
@@ -60,5 +60,10 @@
             <artifactId>job-monitor</artifactId>
             <version>0.17-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index a60a955..31b6f30 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -200,7 +200,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                     //taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
                     //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     return onFail("Couldn't find job id in both submitted and verified steps", false, null);
-                }else {
+                } else {
                     //GFacUtils.saveJobModel(processContext, jobModel);
                 }
 
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 25f8ec5..383fe37 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -103,6 +103,9 @@ public class PostWorkflowManager {
                 String processId = getProcessIdByJobId(jobStatusResult.getJobId());
                 String status = getStatusByJobId(jobStatusResult.getJobId());
 
+                logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
+                        + processId + ", gateway " + gateway + " and status " + status);
+
                 // TODO get cluster lock before that
                 if ("cancelled".equals(status)) {
 
@@ -151,8 +154,8 @@ public class PostWorkflowManager {
                         WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
                                 "wm-23", ServerSettings.getZookeeperConnection());
 
-                        workflowManager.launchWorkflow(UUID.randomUUID().toString(),
-                                allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true);
+                        workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
+                                allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
 
                     } else if (jobStatusResult.getState() == JobState.CANCELED) {
                         logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
@@ -176,25 +179,14 @@ public class PostWorkflowManager {
     private void runConsumer() throws InterruptedException {
         final Consumer<String, JobStatusResult> consumer = createConsumer();
 
-        final int giveUp = 100;   int noRecordsCount = 0;
-
         while (true) {
             final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000);
-
-            /*if (consumerRecords.count() == 0) {
-                noRecordsCount++;
-                if (noRecordsCount > giveUp) break;
-                else continue;
-            }*/
-
             consumerRecords.forEach(record -> {
                 process(record.value());
             });
 
             consumer.commitAsync();
         }
-        //consumer.close();
-        //System.out.println("DONE");
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 9814b01..3030375 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -1,21 +1,29 @@
 package org.apache.airavata.helix.impl.workflow;
 
-import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.EnvSetupTask;
 import org.apache.airavata.helix.impl.task.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
 import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
 import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.messaging.core.*;
 import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -25,11 +33,28 @@ import java.util.stream.Collectors;
 
 public class PreWorkflowManager {
 
+    private static final Logger logger = LogManager.getLogger(PreWorkflowManager.class);
+
+    private final Subscriber subscriber;
+
+    public PreWorkflowManager() throws AiravataException {
+        List<String> routingKeys = new ArrayList<>();
+        routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
+        this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
+    }
+
     public static void main(String[] args) throws Exception {
 
-        String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
-        AppCatalog appCatalog = RegistryFactory.getAppCatalog();
-        ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+        PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
+
+        //String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
+        //AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+
+    }
+
+    private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
+
+        ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
 
         ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
         ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
@@ -74,16 +99,49 @@ public class PreWorkflowManager {
             }
         }
 
-/*        DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
-        defaultJobSubmissionTask.setGatewayId("default");
-        defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
-        defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
-        defaultJobSubmissionTask.setTaskId("TASK_612844a4-aedb-41a5-824f-9b20c76867f7");
-
-        List<AbstractTask> tasks = new ArrayList<>();
-        tasks.add(defaultJobSubmissionTask);
-*/
-        WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
-        workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
+        WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22",
+                ServerSettings.getZookeeperConnection());
+        String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
+                allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
+        return workflowName;
+    }
+
+    private class ProcessLaunchMessageHandler implements MessageHandler {
+
+        @Override
+        public void onMessage(MessageContext messageContext) {
+            logger.info(" Message Received with message id " + messageContext.getMessageId() + " and with message type: " + messageContext.getType());
+
+            if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
+                ProcessSubmitEvent event = new ProcessSubmitEvent();
+                TBase messageEvent = messageContext.getEvent();
+
+                try {
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                } catch (TException e) {
+                    logger.error("Failed to fetch process submit event", e);
+                    subscriber.sendAck(messageContext.getDeliveryTag());
+                }
+
+                String processId = event.getProcessId();
+                String gateway = event.getGatewayId();
+
+                logger.info("Received process launch message for process " + processId + " in gateway " + gateway);
+
+                try {
+                    logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway );
+                    String workflowName = createAndLaunchPreWorkflow(processId, gateway);
+                    logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway );
+                    subscriber.sendAck(messageContext.getDeliveryTag());
+                } catch (Exception e) {
+                    logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
+                    subscriber.sendAck(messageContext.getDeliveryTag());
+                }
+            } else {
+                logger.warn("Unknown message type");
+                subscriber.sendAck(messageContext.getDeliveryTag());
+            }
+        }
     }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.