You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:05 UTC

[airavata] 08/17: Implementing post workflow

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 42ff5f4e2fbdbb1421f686b702a5eb76918bb4d3
Author: dimuthu <di...@gmail.com>
AuthorDate: Sun Mar 4 13:21:55 2018 -0500

    Implementing post workflow
---
 modules/helix-spectator/pom.xml                    |  11 +-
 .../airavata/helix/impl/task/AiravataTask.java     |   2 +-
 .../helix/impl/task/OutputDataStagingTask.java     |  52 ++++-
 .../submission/task/DefaultJobSubmissionTask.java  |   3 +
 .../task/submission/task/JobSubmissionTask.java    |  32 +++
 .../helix/impl/workflow/PostWorkflowManager.java   | 256 +++++++++++++++++++++
 ...SimpleWorkflow.java => PreWorkflowManager.java} |   2 +-
 modules/job-monitor/pom.xml                        |   5 +
 .../airavata/job/monitor/EmailBasedMonitor.java    |   7 +-
 .../monitor/kafka/JobStatusResultDeserializer.java |  34 +++
 .../monitor/kafka/JobStatusResultSerializer.java   |  29 +++
 .../job/monitor/kafka/MessageProducer.java         |  36 +++
 12 files changed, 460 insertions(+), 9 deletions(-)

diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
index 36fb586..213f747 100644
--- a/modules/helix-spectator/pom.xml
+++ b/modules/helix-spectator/pom.xml
@@ -50,6 +50,15 @@
             <artifactId>groovy-templates</artifactId>
             <version>2.4.7</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>job-monitor</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 26361d2..e15195d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -79,7 +79,7 @@ public abstract class AiravataTask extends AbstractTask {
         super.init(manager, workflowName, jobName, taskName);
         try {
             appCatalog = RegistryFactory.getAppCatalog();
-            experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+            experimentCatalog = RegistryFactory.getExperimentCatalog(getGatewayId());
             processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
 
             this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
index d2280d0..f33523c 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
@@ -28,6 +28,7 @@ public class OutputDataStagingTask extends DataStagingTask {
     @Override
     public TaskResult onRun(TaskHelper taskHelper) {
 
+        logger.info("Starting output data staging task " + getTaskId());
         try {
             // Get and validate data staging task model
             DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
@@ -56,14 +57,37 @@ public class OutputDataStagingTask extends DataStagingTask {
             String sourceFileName;
             try {
                 sourceURI = new URI(dataStagingTaskModel.getSource());
-                destinationURI = new URI(dataStagingTaskModel.getDestination());
+                sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                        sourceURI.getPath().length());
+
+                if (dataStagingTaskModel.getDestination().startsWith("dummy")) {
+                    String inputPath  = getTaskContext().getStorageFileSystemRootLocation();
+                    inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+                    String experimentDataDir = getProcessModel().getExperimentDataDir();
+                    String filePath;
+                    if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+                        if(!experimentDataDir.endsWith(File.separator)){
+                            experimentDataDir += File.separator;
+                        }
+                        if (experimentDataDir.startsWith(File.separator)) {
+                            filePath = experimentDataDir + sourceFileName;
+                        } else {
+                            filePath = inputPath + experimentDataDir + sourceFileName;
+                        }
+                    } else {
+                        filePath = inputPath + getProcessId() + File.separator + sourceFileName;
+                    }
+
+                    destinationURI = new URI("file", getTaskContext().getStorageResourceLoginUserName(),
+                            storageResource.getHostName(), 22, filePath, null, null);
+
+                } else {
+                    destinationURI = new URI(dataStagingTaskModel.getDestination());
+                }
 
                 if (logger.isDebugEnabled()) {
                     logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
                 }
-
-                sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
-                        sourceURI.getPath().length());
             } catch (URISyntaxException e) {
                 throw new TaskOnFailException("Failed to obtain source URI for output data staging task " + getTaskId(), true, e);
             }
@@ -164,6 +188,26 @@ public class OutputDataStagingTask extends DataStagingTask {
         }
     }
 
+    public URI getDestinationURIFromDummy(String hostName, String inputPath, String fileName) throws URISyntaxException {
+        String experimentDataDir = getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            if (experimentDataDir.startsWith(File.separator)) {
+                filePath = experimentDataDir + fileName;
+            } else {
+                filePath = inputPath + experimentDataDir + fileName;
+            }
+        } else {
+            filePath = inputPath + getProcessId() + File.separator + fileName;
+        }
+        //FIXME
+        return new URI("file", getTaskContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+    }
+
     @Override
     public void onCancel() {
 
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index e21f200..a60a955 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -36,6 +36,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
 
     @Override
     public TaskResult onRun(TaskHelper taskHelper) {
+
         try {
 
             GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
@@ -126,6 +127,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                     logger.info("Received job id " + jobId + " from compute resource");
                     jobModel.setJobId(jobId);
                     saveJobModel(jobModel);
+
                     JobStatus jobStatus = new JobStatus();
                     jobStatus.setJobState(JobState.SUBMITTED);
                     jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
@@ -139,6 +141,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                         jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                         jobModel.setJobStatuses(Arrays.asList(jobStatus));
                         saveJobStatus(jobModel);
+                        createMonitoringNode(jobId);
                     }
 
                     if (getComputeResourceDescription().isGatewayUsageReporting()){
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index ac314e9..afa2630 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.AiravataTask;
@@ -27,9 +28,15 @@ import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.helix.HelixManager;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
 
 import java.io.File;
 import java.security.SecureRandom;
@@ -39,9 +46,34 @@ public abstract class JobSubmissionTask extends AiravataTask {
 
     private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class);
 
+    private CuratorFramework curatorClient = null;
+
     @Override
     public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        try {
+            this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+            this.curatorClient.start();
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+            logger.error("Failed to create curator client ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public CuratorFramework getCuratorClient() {
+        return curatorClient;
+    }
+
+    // TODO perform exception handling
+    protected void createMonitoringNode(String jobId) throws Exception {
+        logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId + ", process : "
+                + getProcessId() + ", gateway : " + getGatewayId());
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]);
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes());
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes());
     }
 
     //////////////////////
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
new file mode 100644
index 0000000..25f8ec5
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -0,0 +1,256 @@
+package org.apache.airavata.helix.impl.workflow;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
+import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
+import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer;
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PostWorkflowManager {
+
+    private static final Logger logger = LogManager.getLogger(PostWorkflowManager.class);
+
+    private final String BOOTSTRAP_SERVERS = "localhost:9092";
+    private final String TOPIC = "parsed-data";
+
+    private CuratorFramework curatorClient = null;
+
+    private void init() throws ApplicationSettingsException {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+        this.curatorClient.start();
+    }
+
+    private Consumer<String, JobStatusResult> createConsumer() {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "MonitoringConsumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName());
+        // Create the consumer using props.
+        final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<String, JobStatusResult>(props);
+        // Subscribe to the topic.
+        consumer.subscribe(Collections.singletonList(TOPIC));
+        return consumer;
+    }
+
+    private String getProcessIdByJobId(String jobId) throws Exception {
+        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
+        String process = new String(processBytes);
+        return process;
+    }
+
+    private String getGatewayByJobId(String jobId) throws Exception {
+        byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway");
+        String gateway = new String(gatewayBytes);
+        return gateway;
+    }
+
+    private String getStatusByJobId(String jobId) throws Exception {
+        byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status");
+        String status = new String(statusBytes);
+        return status;
+    }
+
+    private boolean hasMonitoringRegistered(String jobId) throws Exception {
+        Stat stat = this.curatorClient.checkExists().forPath("/monitoring/" + jobId);
+        return stat != null;
+    }
+
+    private void process(JobStatusResult jobStatusResult) {
+
+        if (jobStatusResult == null) {
+            return;
+        }
+
+        try {
+            logger.info("Processing job result " + jobStatusResult.getJobId());
+
+            if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
+                String gateway = getGatewayByJobId(jobStatusResult.getJobId());
+                String processId = getProcessIdByJobId(jobStatusResult.getJobId());
+                String status = getStatusByJobId(jobStatusResult.getJobId());
+
+                // TODO get cluster lock before that
+                if ("cancelled".equals(status)) {
+
+                } else {
+
+                    if (jobStatusResult.getState() == JobState.COMPLETE) {
+                        logger.info("Job " + jobStatusResult.getJobId() + " was completed");
+
+                        ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+                        ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+                        ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+                        String taskDag = processModel.getTaskDag();
+                        List<TaskModel> taskList = processModel.getTasks();
+
+                        String[] taskIds = taskDag.split(",");
+                        final List<AiravataTask> allTasks = new ArrayList<>();
+
+                        boolean jobSubmissionFound = false;
+
+                        for (String taskId : taskIds) {
+                            Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+
+                            if (model.isPresent()) {
+                                TaskModel taskModel = model.get();
+                                AiravataTask airavataTask = null;
+                                if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+                                    jobSubmissionFound = true;
+                                } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+                                    if (jobSubmissionFound) {
+                                        airavataTask = new OutputDataStagingTask();
+                                    }
+                                }
+
+                                if (airavataTask != null) {
+                                    airavataTask.setGatewayId(experimentModel.getGatewayId());
+                                    airavataTask.setExperimentId(experimentModel.getExperimentId());
+                                    airavataTask.setProcessId(processModel.getProcessId());
+                                    airavataTask.setTaskId(taskModel.getTaskId());
+                                    if (allTasks.size() > 0) {
+                                        allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+                                    }
+                                    allTasks.add(airavataTask);
+                                }
+                            }
+                        }
+                        WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
+                                "wm-23", ServerSettings.getZookeeperConnection());
+
+                        workflowManager.launchWorkflow(UUID.randomUUID().toString(),
+                                allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true);
+
+                    } else if (jobStatusResult.getState() == JobState.CANCELED) {
+                        logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
+
+                    } else if (jobStatusResult.getState() == JobState.FAILED) {
+                        logger.info("Job " + jobStatusResult.getJobId() + " was failed");
+
+                    } else if (jobStatusResult.getState() == JobState.SUBMITTED) {
+                        logger.info("Job " + jobStatusResult.getJobId() + " was submitted");
+
+                    }
+                }
+            } else {
+                logger.warn("Could not find a monitoring register for job id " + jobStatusResult.getJobId());
+            }
+        } catch (Exception e) {
+            logger.error("Failed to process job : " + jobStatusResult.getJobId() + ", with status : " + jobStatusResult.getState().name(), e);
+        }
+    }
+
+    private void runConsumer() throws InterruptedException {
+        final Consumer<String, JobStatusResult> consumer = createConsumer();
+
+        final int giveUp = 100;   int noRecordsCount = 0;
+
+        while (true) {
+            final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000);
+
+            /*if (consumerRecords.count() == 0) {
+                noRecordsCount++;
+                if (noRecordsCount > giveUp) break;
+                else continue;
+            }*/
+
+            consumerRecords.forEach(record -> {
+                process(record.value());
+            });
+
+            consumer.commitAsync();
+        }
+        //consumer.close();
+        //System.out.println("DONE");
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        PostWorkflowManager postManager = new PostWorkflowManager();
+        postManager.init();
+        postManager.runConsumer();
+        /*
+        String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
+        ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+
+        ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+        ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+        String taskDag = processModel.getTaskDag();
+        List<TaskModel> taskList = processModel.getTasks();
+
+        String[] taskIds = taskDag.split(",");
+        final List<AiravataTask> allTasks = new ArrayList<>();
+
+        boolean jobSubmissionFound = false;
+
+        for (String taskId : taskIds) {
+            Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+
+            if (model.isPresent()) {
+                TaskModel taskModel = model.get();
+                AiravataTask airavataTask = null;
+                if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
+                    //airavataTask = new EnvSetupTask();
+                } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+                    //airavataTask = new DefaultJobSubmissionTask();
+                    //airavataTask.setRetryCount(1);
+                    jobSubmissionFound = true;
+                } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+                    if (jobSubmissionFound) {
+                        airavataTask = new OutputDataStagingTask();
+                    } else {
+                        //airavataTask = new InputDataStagingTask();
+                    }
+                }
+
+                if (airavataTask != null) {
+                    airavataTask.setGatewayId(experimentModel.getGatewayId());
+                    airavataTask.setExperimentId(experimentModel.getExperimentId());
+                    airavataTask.setProcessId(processModel.getProcessId());
+                    airavataTask.setTaskId(taskModel.getTaskId());
+                    if (allTasks.size() > 0) {
+                        allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+                    }
+                    allTasks.add(airavataTask);
+                }
+            }
+        }
+
+        WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
+        workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
+        */
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
similarity index 99%
rename from modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
rename to modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index abd36e1..9814b01 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -23,7 +23,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-public class SimpleWorkflow {
+public class PreWorkflowManager {
 
     public static void main(String[] args) throws Exception {
 
diff --git a/modules/job-monitor/pom.xml b/modules/job-monitor/pom.xml
index c536a14..7a69882 100644
--- a/modules/job-monitor/pom.xml
+++ b/modules/job-monitor/pom.xml
@@ -33,6 +33,11 @@
             <artifactId>snakeyaml</artifactId>
             <version>1.15</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.0.0</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
index 7b13354..e41f500 100644
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
@@ -2,6 +2,7 @@ package org.apache.airavata.job.monitor;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.kafka.MessageProducer;
 import org.apache.airavata.job.monitor.parser.EmailParser;
 import org.apache.airavata.job.monitor.parser.JobStatusResult;
 import org.apache.airavata.job.monitor.parser.ResourceConfig;
@@ -48,6 +49,7 @@ public class EmailBasedMonitor implements Runnable {
     private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
     private Timer timer;
     private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new HashMap<>();
+    private MessageProducer messageProducer = new MessageProducer();
 
 
     public EmailBasedMonitor() throws Exception {
@@ -235,8 +237,9 @@ public class EmailBasedMonitor implements Runnable {
             try {
                 JobStatusResult jobStatusResult = parse(message);
                 log.info(jobStatusResult.getJobId() + ", " + jobStatusResult.getJobName() + ", " + jobStatusResult.getState().getValue());
-                //processedMessages.add(message);
-                unreadMessages.add(message);
+                messageProducer.submitMessageToQueue(jobStatusResult);
+                processedMessages.add(message);
+                //unreadMessages.add(message);
             } catch (Exception e) {
                 unreadMessages.add(message);
             }
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
new file mode 100644
index 0000000..c3c7877
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.model.status.JobState;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+public class JobStatusResultDeserializer implements Deserializer<JobStatusResult> {
+    @Override
+    public void configure(Map<String, ?> map, boolean b) {
+
+    }
+
+    @Override
+    public JobStatusResult deserialize(String s, byte[] bytes) {
+        String deserializedData = new String(bytes);
+        String[] parts = deserializedData.split(",");
+        JobStatusResult jobStatusResult = new JobStatusResult();
+        jobStatusResult.setJobId(parts[0]);
+        jobStatusResult.setJobName(parts[1]);
+        jobStatusResult.setState(JobState.valueOf(parts[2]));
+        return jobStatusResult;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
new file mode 100644
index 0000000..a0dc6ec
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+public class JobStatusResultSerializer implements Serializer<JobStatusResult> {
+
+    @Override
+    public void configure(Map<String, ?> map, boolean b) {
+
+    }
+
+    @Override
+    public byte[] serialize(String s, JobStatusResult jobStatusResult) {
+        String serializedData = jobStatusResult.getJobId() + "," + jobStatusResult.getJobName() + "," + jobStatusResult.getState().name();
+        return serializedData.getBytes();
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
new file mode 100644
index 0000000..748a533
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
@@ -0,0 +1,36 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class MessageProducer {
+    private final static String TOPIC = "parsed-data";
+    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
+
+    final Producer<String, JobStatusResult> producer;
+
+    public MessageProducer() {
+        producer = createProducer();
+    }
+
+    private Producer<String, JobStatusResult> createProducer() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                BOOTSTRAP_SERVERS);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                JobStatusResultSerializer.class.getName());
+        return new KafkaProducer<String, JobStatusResult>(props);
+    }
+
+    public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
+        final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
+        RecordMetadata recordMetadata = producer.send(record).get();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.