You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:05 UTC
[airavata] 08/17: Implementing post workflow
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 42ff5f4e2fbdbb1421f686b702a5eb76918bb4d3
Author: dimuthu <di...@gmail.com>
AuthorDate: Sun Mar 4 13:21:55 2018 -0500
Implementing post workflow
---
modules/helix-spectator/pom.xml | 11 +-
.../airavata/helix/impl/task/AiravataTask.java | 2 +-
.../helix/impl/task/OutputDataStagingTask.java | 52 ++++-
.../submission/task/DefaultJobSubmissionTask.java | 3 +
.../task/submission/task/JobSubmissionTask.java | 32 +++
.../helix/impl/workflow/PostWorkflowManager.java | 256 +++++++++++++++++++++
...SimpleWorkflow.java => PreWorkflowManager.java} | 2 +-
modules/job-monitor/pom.xml | 5 +
.../airavata/job/monitor/EmailBasedMonitor.java | 7 +-
.../monitor/kafka/JobStatusResultDeserializer.java | 34 +++
.../monitor/kafka/JobStatusResultSerializer.java | 29 +++
.../job/monitor/kafka/MessageProducer.java | 36 +++
12 files changed, 460 insertions(+), 9 deletions(-)
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
index 36fb586..213f747 100644
--- a/modules/helix-spectator/pom.xml
+++ b/modules/helix-spectator/pom.xml
@@ -50,6 +50,15 @@
<artifactId>groovy-templates</artifactId>
<version>2.4.7</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>job-monitor</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 26361d2..e15195d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -79,7 +79,7 @@ public abstract class AiravataTask extends AbstractTask {
super.init(manager, workflowName, jobName, taskName);
try {
appCatalog = RegistryFactory.getAppCatalog();
- experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+ experimentCatalog = RegistryFactory.getExperimentCatalog(getGatewayId());
processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
index d2280d0..f33523c 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
@@ -28,6 +28,7 @@ public class OutputDataStagingTask extends DataStagingTask {
@Override
public TaskResult onRun(TaskHelper taskHelper) {
+ logger.info("Starting output data staging task " + getTaskId());
try {
// Get and validate data staging task model
DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
@@ -56,14 +57,37 @@ public class OutputDataStagingTask extends DataStagingTask {
String sourceFileName;
try {
sourceURI = new URI(dataStagingTaskModel.getSource());
- destinationURI = new URI(dataStagingTaskModel.getDestination());
+ sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+
+ if (dataStagingTaskModel.getDestination().startsWith("dummy")) {
+ String inputPath = getTaskContext().getStorageFileSystemRootLocation();
+ inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+ String experimentDataDir = getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ if (experimentDataDir.startsWith(File.separator)) {
+ filePath = experimentDataDir + sourceFileName;
+ } else {
+ filePath = inputPath + experimentDataDir + sourceFileName;
+ }
+ } else {
+ filePath = inputPath + getProcessId() + File.separator + sourceFileName;
+ }
+
+ destinationURI = new URI("file", getTaskContext().getStorageResourceLoginUserName(),
+ storageResource.getHostName(), 22, filePath, null, null);
+
+ } else {
+ destinationURI = new URI(dataStagingTaskModel.getDestination());
+ }
if (logger.isDebugEnabled()) {
logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
}
-
- sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
- sourceURI.getPath().length());
} catch (URISyntaxException e) {
throw new TaskOnFailException("Failed to obtain source URI for output data staging task " + getTaskId(), true, e);
}
@@ -164,6 +188,26 @@ public class OutputDataStagingTask extends DataStagingTask {
}
}
+ public URI getDestinationURIFromDummy(String hostName, String inputPath, String fileName) throws URISyntaxException {
+ String experimentDataDir = getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ if (experimentDataDir.startsWith(File.separator)) {
+ filePath = experimentDataDir + fileName;
+ } else {
+ filePath = inputPath + experimentDataDir + fileName;
+ }
+ } else {
+ filePath = inputPath + getProcessId() + File.separator + fileName;
+ }
+ //FIXME
+ return new URI("file", getTaskContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+ }
+
@Override
public void onCancel() {
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index e21f200..a60a955 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -36,6 +36,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
@Override
public TaskResult onRun(TaskHelper taskHelper) {
+
try {
GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
@@ -126,6 +127,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
logger.info("Received job id " + jobId + " from compute resource");
jobModel.setJobId(jobId);
saveJobModel(jobModel);
+
JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(JobState.SUBMITTED);
jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
@@ -139,6 +141,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
saveJobStatus(jobModel);
+ createMonitoringNode(jobId);
}
if (getComputeResourceDescription().isGatewayUsageReporting()){
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index ac314e9..afa2630 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.AiravataTask;
@@ -27,9 +28,15 @@ import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.cpi.*;
import org.apache.commons.io.FileUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.helix.HelixManager;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
import java.io.File;
import java.security.SecureRandom;
@@ -39,9 +46,34 @@ public abstract class JobSubmissionTask extends AiravataTask {
private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class);
+ private CuratorFramework curatorClient = null;
+
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ try {
+ this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+ this.curatorClient.start();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ logger.error("Failed to create curator client ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public CuratorFramework getCuratorClient() {
+ return curatorClient;
+ }
+
+ // TODO perform exception handling
+ protected void createMonitoringNode(String jobId) throws Exception {
+ logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId + ", process : "
+ + getProcessId() + ", gateway : " + getGatewayId());
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]);
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes());
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes());
}
//////////////////////
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
new file mode 100644
index 0000000..25f8ec5
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -0,0 +1,256 @@
+package org.apache.airavata.helix.impl.workflow;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
+import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
+import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer;
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PostWorkflowManager {
+
+ private static final Logger logger = LogManager.getLogger(PostWorkflowManager.class);
+
+ private final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private final String TOPIC = "parsed-data";
+
+ private CuratorFramework curatorClient = null;
+
+ private void init() throws ApplicationSettingsException {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
+ this.curatorClient.start();
+ }
+
+ private Consumer<String, JobStatusResult> createConsumer() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "MonitoringConsumer");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName());
+ // Create the consumer using props.
+ final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<String, JobStatusResult>(props);
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ return consumer;
+ }
+
+ private String getProcessIdByJobId(String jobId) throws Exception {
+ byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
+ String process = new String(processBytes);
+ return process;
+ }
+
+ private String getGatewayByJobId(String jobId) throws Exception {
+ byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway");
+ String gateway = new String(gatewayBytes);
+ return gateway;
+ }
+
+ private String getStatusByJobId(String jobId) throws Exception {
+ byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status");
+ String status = new String(statusBytes);
+ return status;
+ }
+
+ private boolean hasMonitoringRegistered(String jobId) throws Exception {
+ Stat stat = this.curatorClient.checkExists().forPath("/monitoring/" + jobId);
+ return stat != null;
+ }
+
+ private void process(JobStatusResult jobStatusResult) {
+
+ if (jobStatusResult == null) {
+ return;
+ }
+
+ try {
+ logger.info("Processing job result " + jobStatusResult.getJobId());
+
+ if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
+ String gateway = getGatewayByJobId(jobStatusResult.getJobId());
+ String processId = getProcessIdByJobId(jobStatusResult.getJobId());
+ String status = getStatusByJobId(jobStatusResult.getJobId());
+
+ // TODO get cluster lock before that
+ if ("cancelled".equals(status)) {
+
+ } else {
+
+ if (jobStatusResult.getState() == JobState.COMPLETE) {
+ logger.info("Job " + jobStatusResult.getJobId() + " was completed");
+
+ ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+ ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+ ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+ String taskDag = processModel.getTaskDag();
+ List<TaskModel> taskList = processModel.getTasks();
+
+ String[] taskIds = taskDag.split(",");
+ final List<AiravataTask> allTasks = new ArrayList<>();
+
+ boolean jobSubmissionFound = false;
+
+ for (String taskId : taskIds) {
+ Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+
+ if (model.isPresent()) {
+ TaskModel taskModel = model.get();
+ AiravataTask airavataTask = null;
+ if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+ jobSubmissionFound = true;
+ } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+ if (jobSubmissionFound) {
+ airavataTask = new OutputDataStagingTask();
+ }
+ }
+
+ if (airavataTask != null) {
+ airavataTask.setGatewayId(experimentModel.getGatewayId());
+ airavataTask.setExperimentId(experimentModel.getExperimentId());
+ airavataTask.setProcessId(processModel.getProcessId());
+ airavataTask.setTaskId(taskModel.getTaskId());
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+ }
+ allTasks.add(airavataTask);
+ }
+ }
+ }
+ WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
+ "wm-23", ServerSettings.getZookeeperConnection());
+
+ workflowManager.launchWorkflow(UUID.randomUUID().toString(),
+ allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true);
+
+ } else if (jobStatusResult.getState() == JobState.CANCELED) {
+ logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
+
+ } else if (jobStatusResult.getState() == JobState.FAILED) {
+ logger.info("Job " + jobStatusResult.getJobId() + " was failed");
+
+ } else if (jobStatusResult.getState() == JobState.SUBMITTED) {
+ logger.info("Job " + jobStatusResult.getJobId() + " was submitted");
+
+ }
+ }
+ } else {
+ logger.warn("Could not find a monitoring register for job id " + jobStatusResult.getJobId());
+ }
+ } catch (Exception e) {
+ logger.error("Failed to process job : " + jobStatusResult.getJobId() + ", with status : " + jobStatusResult.getState().name(), e);
+ }
+ }
+
+ private void runConsumer() throws InterruptedException {
+ final Consumer<String, JobStatusResult> consumer = createConsumer();
+
+ final int giveUp = 100; int noRecordsCount = 0;
+
+ while (true) {
+ final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000);
+
+ /*if (consumerRecords.count() == 0) {
+ noRecordsCount++;
+ if (noRecordsCount > giveUp) break;
+ else continue;
+ }*/
+
+ consumerRecords.forEach(record -> {
+ process(record.value());
+ });
+
+ consumer.commitAsync();
+ }
+ //consumer.close();
+ //System.out.println("DONE");
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ PostWorkflowManager postManager = new PostWorkflowManager();
+ postManager.init();
+ postManager.runConsumer();
+ /*
+ String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
+ ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+
+ ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+ ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+ String taskDag = processModel.getTaskDag();
+ List<TaskModel> taskList = processModel.getTasks();
+
+ String[] taskIds = taskDag.split(",");
+ final List<AiravataTask> allTasks = new ArrayList<>();
+
+ boolean jobSubmissionFound = false;
+
+ for (String taskId : taskIds) {
+ Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+
+ if (model.isPresent()) {
+ TaskModel taskModel = model.get();
+ AiravataTask airavataTask = null;
+ if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
+ //airavataTask = new EnvSetupTask();
+ } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+ //airavataTask = new DefaultJobSubmissionTask();
+ //airavataTask.setRetryCount(1);
+ jobSubmissionFound = true;
+ } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+ if (jobSubmissionFound) {
+ airavataTask = new OutputDataStagingTask();
+ } else {
+ //airavataTask = new InputDataStagingTask();
+ }
+ }
+
+ if (airavataTask != null) {
+ airavataTask.setGatewayId(experimentModel.getGatewayId());
+ airavataTask.setExperimentId(experimentModel.getExperimentId());
+ airavataTask.setProcessId(processModel.getProcessId());
+ airavataTask.setTaskId(taskModel.getTaskId());
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+ }
+ allTasks.add(airavataTask);
+ }
+ }
+ }
+
+ WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
+ workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
+ */
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
similarity index 99%
rename from modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
rename to modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index abd36e1..9814b01 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -23,7 +23,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
-public class SimpleWorkflow {
+public class PreWorkflowManager {
public static void main(String[] args) throws Exception {
diff --git a/modules/job-monitor/pom.xml b/modules/job-monitor/pom.xml
index c536a14..7a69882 100644
--- a/modules/job-monitor/pom.xml
+++ b/modules/job-monitor/pom.xml
@@ -33,6 +33,11 @@
<artifactId>snakeyaml</artifactId>
<version>1.15</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>1.0.0</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
index 7b13354..e41f500 100644
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java
@@ -2,6 +2,7 @@ package org.apache.airavata.job.monitor;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.kafka.MessageProducer;
import org.apache.airavata.job.monitor.parser.EmailParser;
import org.apache.airavata.job.monitor.parser.JobStatusResult;
import org.apache.airavata.job.monitor.parser.ResourceConfig;
@@ -48,6 +49,7 @@ public class EmailBasedMonitor implements Runnable {
private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
private Timer timer;
private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new HashMap<>();
+ private MessageProducer messageProducer = new MessageProducer();
public EmailBasedMonitor() throws Exception {
@@ -235,8 +237,9 @@ public class EmailBasedMonitor implements Runnable {
try {
JobStatusResult jobStatusResult = parse(message);
log.info(jobStatusResult.getJobId() + ", " + jobStatusResult.getJobName() + ", " + jobStatusResult.getState().getValue());
- //processedMessages.add(message);
- unreadMessages.add(message);
+ messageProducer.submitMessageToQueue(jobStatusResult);
+ processedMessages.add(message);
+ //unreadMessages.add(message);
} catch (Exception e) {
unreadMessages.add(message);
}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
new file mode 100644
index 0000000..c3c7877
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.model.status.JobState;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+public class JobStatusResultDeserializer implements Deserializer<JobStatusResult> {
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+
+ }
+
+ @Override
+ public JobStatusResult deserialize(String s, byte[] bytes) {
+ String deserializedData = new String(bytes);
+ String[] parts = deserializedData.split(",");
+ JobStatusResult jobStatusResult = new JobStatusResult();
+ jobStatusResult.setJobId(parts[0]);
+ jobStatusResult.setJobName(parts[1]);
+ jobStatusResult.setState(JobState.valueOf(parts[2]));
+ return jobStatusResult;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
new file mode 100644
index 0000000..a0dc6ec
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+public class JobStatusResultSerializer implements Serializer<JobStatusResult> {
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+
+ }
+
+ @Override
+ public byte[] serialize(String s, JobStatusResult jobStatusResult) {
+ String serializedData = jobStatusResult.getJobId() + "," + jobStatusResult.getJobName() + "," + jobStatusResult.getState().name();
+ return serializedData.getBytes();
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
new file mode 100644
index 0000000..748a533
--- /dev/null
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
@@ -0,0 +1,36 @@
+package org.apache.airavata.job.monitor.kafka;
+
+import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class MessageProducer {
+ private final static String TOPIC = "parsed-data";
+ private final static String BOOTSTRAP_SERVERS = "localhost:9092";
+
+ final Producer<String, JobStatusResult> producer;
+
+ public MessageProducer() {
+ producer = createProducer();
+ }
+
+ private Producer<String, JobStatusResult> createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ BOOTSTRAP_SERVERS);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ JobStatusResultSerializer.class.getName());
+ return new KafkaProducer<String, JobStatusResult>(props);
+ }
+
+ public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
+ final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
+ RecordMetadata recordMetadata = producer.send(record).get();
+ }
+}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.