You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/21 23:54:29 UTC

[airavata] branch master updated (7e7ed8a -> 9bebfa0)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git.


    from 7e7ed8a  Ansible: update ssl paths for southdakota
     new a88c956  Logging improvements for email monitor
     new 9bebfa0  Fixing possible kafka read thread timeouts when processing post wm job status messages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/impl/workflow/PostWorkflowManager.java   | 36 +++++++++++++++++-----
 .../airavata/monitor/email/EmailBasedMonitor.java  |  4 ++-
 .../apache/airavata/monitor/AbstractMonitor.java   |  1 +
 3 files changed, 32 insertions(+), 9 deletions(-)


[airavata] 01/02: Logging improvements for email monitor

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit a88c9562e2a8213c37784b26ee25116bbfa3b793
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 21 19:53:19 2020 -0400

    Logging improvements for email monitor
---
 .../java/org/apache/airavata/monitor/email/EmailBasedMonitor.java     | 4 +++-
 .../src/main/java/org/apache/airavata/monitor/AbstractMonitor.java    | 1 +
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index 64f5b86..ddfd951 100644
--- a/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ b/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -235,9 +235,11 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable {
         List<Message> unreadMessages = new ArrayList<>();
         for (Message message : searchMessages) {
             try {
+                log.info("Parsing the job status message");
                 JobStatusResult jobStatusResult = parse(message);
-                log.info(jobStatusResult.getJobId() + ", " + jobStatusResult.getJobName() + ", " + jobStatusResult.getState().getValue());
+                log.info("Job message parsed. Job Id " + jobStatusResult.getJobId() + ", Job Name " + jobStatusResult.getJobName() + ", Job State " + jobStatusResult.getState().getValue());
                 submitJobStatus(jobStatusResult);
+                log.info("Submitted the job {} status to queue", jobStatusResult.getJobId());
                 processedMessages.add(message);
             } catch (Exception e) {
                 log.error("Error in submitting job status to queue", e);
diff --git a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
index 69bf883..a2581ad 100644
--- a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
+++ b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
@@ -55,6 +55,7 @@ public class AbstractMonitor {
         RegistryService.Client registryClient = getRegistryClientPool().getResource();
         boolean validated = true;
         try {
+            log.info("Fetching matching jobs for job id {} from registry", jobStatusResult.getJobId());
             List<JobModel> jobs = registryClient.getJobs("jobId", jobStatusResult.getJobId());
 
             if (jobs.size() > 0) {


[airavata] 02/02: Fixing possible kafka read thread timeouts when processing post wm job status messages

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 9bebfa092d4497129422635a4a7ad4362129a1c2
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 21 19:54:12 2020 -0400

    Fixing possible kafka read thread timeouts when processing post wm job status messages
---
 .../helix/impl/workflow/PostWorkflowManager.java   | 36 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 43c7442..f3b9dea 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -55,12 +55,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 public class PostWorkflowManager extends WorkflowManager {
 
     private final static Logger logger = LoggerFactory.getLogger(PostWorkflowManager.class);
 
+    private ExecutorService processingPool = Executors.newFixedThreadPool(10);
+
     public PostWorkflowManager() throws ApplicationSettingsException {
         super(ServerSettings.getSetting("post.workflow.manager.name"),
                 Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
@@ -303,22 +306,39 @@ public class PostWorkflowManager extends WorkflowManager {
         new Thread(() -> {
 
             while (true) {
+
                 final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Long.MAX_VALUE);
+                CompletionService<Boolean> executorCompletionService= new ExecutorCompletionService<>(processingPool);
+                List<Future<Boolean>> processingFutures = new ArrayList<>();
 
                 for (TopicPartition partition : consumerRecords.partitions()) {
                     List<ConsumerRecord<String, JobStatusResult>> partitionRecords = consumerRecords.records(partition);
+                    logger.info("Received job records {}", partitionRecords.size());
+
                     for (ConsumerRecord<String, JobStatusResult> record : partitionRecords) {
-                        boolean success = process(record.value());
-                        logger.info("Status of processing " + record.value().getJobId() + " : " + success);
-                        if (success) {
-                            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
-                        }
+                        logger.info("Submitting {} to process in thread pool", record.value().getJobId());
+
+                        // This avoids kafka read thread to wait until processing is completed before committing
+                        // There is a risk of missing 20 messages in case of a restart but this improves the robustness
+                        // of the kafka read thread by avoiding wait timeouts
+                        processingFutures.add(executorCompletionService.submit(() -> {
+                            boolean success = process(record.value());
+                            logger.info("Status of processing " + record.value().getJobId() + " : " + success);
+                            return success;
+                        }));
+
+                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                     }
                 }
 
-                consumerRecords.forEach(record -> process(record.value()));
-
-                consumer.commitAsync();
+                for (Future<Boolean> f: processingFutures) {
+                    try {
+                        executorCompletionService.take().get();
+                    } catch (Exception e) {
+                        logger.error("Failed processing job", e);
+                    }
+                }
+                logger.info("All messages processed. Moving to next round");
             }
         }).start();
     }