You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:06 UTC
[airavata] 09/17: Configuring pre workflow manager to read from
rabbitmq launch queue
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit ca455645a09da0c8184a741c7ee5cb3853338d70
Author: dimuthu <di...@gmail.com>
AuthorDate: Sun Mar 4 20:48:23 2018 -0500
Configuring pre workflow manager to read from rabbitmq launch queue
---
.../airavata/helix/workflow/WorkflowManager.java | 22 ++++--
modules/helix-spectator/pom.xml | 5 ++
.../submission/task/DefaultJobSubmissionTask.java | 2 +-
.../helix/impl/workflow/PostWorkflowManager.java | 18 ++---
.../helix/impl/workflow/PreWorkflowManager.java | 92 ++++++++++++++++++----
5 files changed, 102 insertions(+), 37 deletions(-)
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index 9ecafb9..e3d07b7 100644
--- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -2,13 +2,14 @@ package org.apache.airavata.helix.workflow;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.core.util.*;
import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -22,6 +23,8 @@ import java.util.Map;
*/
public class WorkflowManager {
+ private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
+
private static final String WORKFLOW_PREFIX = "Workflow_of_process_";
private TaskDriver taskDriver;
@@ -43,9 +46,12 @@ public class WorkflowManager {
taskDriver = new TaskDriver(helixManager);
}
- public void launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant) throws Exception {
+ public String launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant, boolean monitor) throws Exception {
+
+ String workflowName = WORKFLOW_PREFIX + processId;
+ logger.info("Launching workflow " + workflowName + " for process " + processId);
- Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_PREFIX + processId).setExpiry(0);
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
for (int i = 0; i < tasks.size(); i++) {
AbstractTask data = tasks.get(i);
@@ -86,9 +92,13 @@ public class WorkflowManager {
//TODO : Do we need to monitor workflow status? If so how do we do it in a scalable manner? For example,
// if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility
- TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
- TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
- System.out.println("Workflow finished with state " + taskState.name());
+ if (monitor) {
+ TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+ logger.info("Workflow " + workflowName + " for process " + processId + " finished with state " + taskState.name());
+
+ }
+ return workflowName;
}
}
\ No newline at end of file
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
index 213f747..326d7ef 100644
--- a/modules/helix-spectator/pom.xml
+++ b/modules/helix-spectator/pom.xml
@@ -60,5 +60,10 @@
<artifactId>job-monitor</artifactId>
<version>0.17-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index a60a955..31b6f30 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -200,7 +200,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
//taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
//taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
return onFail("Couldn't find job id in both submitted and verified steps", false, null);
- }else {
+ } else {
//GFacUtils.saveJobModel(processContext, jobModel);
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 25f8ec5..383fe37 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -103,6 +103,9 @@ public class PostWorkflowManager {
String processId = getProcessIdByJobId(jobStatusResult.getJobId());
String status = getStatusByJobId(jobStatusResult.getJobId());
+ logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
+ + processId + ", gateway " + gateway + " and status " + status);
+
// TODO get cluster lock before that
if ("cancelled".equals(status)) {
@@ -151,8 +154,8 @@ public class PostWorkflowManager {
WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
"wm-23", ServerSettings.getZookeeperConnection());
- workflowManager.launchWorkflow(UUID.randomUUID().toString(),
- allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true);
+ workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
+ allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
} else if (jobStatusResult.getState() == JobState.CANCELED) {
logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
@@ -176,25 +179,14 @@ public class PostWorkflowManager {
private void runConsumer() throws InterruptedException {
final Consumer<String, JobStatusResult> consumer = createConsumer();
- final int giveUp = 100; int noRecordsCount = 0;
-
while (true) {
final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000);
-
- /*if (consumerRecords.count() == 0) {
- noRecordsCount++;
- if (noRecordsCount > giveUp) break;
- else continue;
- }*/
-
consumerRecords.forEach(record -> {
process(record.value());
});
consumer.commitAsync();
}
- //consumer.close();
- //System.out.println("DONE");
}
public static void main(String[] args) throws Exception {
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 9814b01..3030375 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -1,21 +1,29 @@
package org.apache.airavata.helix.impl.workflow;
-import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.EnvSetupTask;
import org.apache.airavata.helix.impl.task.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.messaging.core.*;
import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
@@ -25,11 +33,28 @@ import java.util.stream.Collectors;
public class PreWorkflowManager {
+ private static final Logger logger = LogManager.getLogger(PreWorkflowManager.class);
+
+ private final Subscriber subscriber;
+
+ public PreWorkflowManager() throws AiravataException {
+ List<String> routingKeys = new ArrayList<>();
+ routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
+ this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
+ }
+
public static void main(String[] args) throws Exception {
- String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
- AppCatalog appCatalog = RegistryFactory.getAppCatalog();
- ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+ PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
+
+ //String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
+ //AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+
+ }
+
+ private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception {
+
+ ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
@@ -74,16 +99,49 @@ public class PreWorkflowManager {
}
}
-/* DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
- defaultJobSubmissionTask.setGatewayId("default");
- defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
- defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
- defaultJobSubmissionTask.setTaskId("TASK_612844a4-aedb-41a5-824f-9b20c76867f7");
-
- List<AbstractTask> tasks = new ArrayList<>();
- tasks.add(defaultJobSubmissionTask);
-*/
- WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
- workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
+ WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22",
+ ServerSettings.getZookeeperConnection());
+ String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
+ allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false);
+ return workflowName;
+ }
+
+ private class ProcessLaunchMessageHandler implements MessageHandler {
+
+ @Override
+ public void onMessage(MessageContext messageContext) {
+ logger.info(" Message Received with message id " + messageContext.getMessageId() + " and with message type: " + messageContext.getType());
+
+ if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
+ ProcessSubmitEvent event = new ProcessSubmitEvent();
+ TBase messageEvent = messageContext.getEvent();
+
+ try {
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ } catch (TException e) {
+ logger.error("Failed to fetch process submit event", e);
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ }
+
+ String processId = event.getProcessId();
+ String gateway = event.getGatewayId();
+
+ logger.info("Received process launch message for process " + processId + " in gateway " + gateway);
+
+ try {
+ logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway );
+ String workflowName = createAndLaunchPreWorkflow(processId, gateway);
+ logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway );
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ } catch (Exception e) {
+ logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ }
+ } else {
+ logger.warn("Unknown message type");
+ subscriber.sendAck(messageContext.getDeliveryTag());
+ }
+ }
}
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.