You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/21 23:54:31 UTC

[airavata] 02/02: Fixing possible kafka read thread timeouts when processing post wm job status messages

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 9bebfa092d4497129422635a4a7ad4362129a1c2
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 21 19:54:12 2020 -0400

    Fixing possible kafka read thread timeouts when processing post wm job status messages
---
 .../helix/impl/workflow/PostWorkflowManager.java   | 36 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 43c7442..f3b9dea 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -55,12 +55,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 public class PostWorkflowManager extends WorkflowManager {
 
     private final static Logger logger = LoggerFactory.getLogger(PostWorkflowManager.class);
 
+    private ExecutorService processingPool = Executors.newFixedThreadPool(10);
+
     public PostWorkflowManager() throws ApplicationSettingsException {
         super(ServerSettings.getSetting("post.workflow.manager.name"),
                 Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
@@ -303,22 +306,39 @@ public class PostWorkflowManager extends WorkflowManager {
         new Thread(() -> {
 
             while (true) {
+
                 final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Long.MAX_VALUE);
+                CompletionService<Boolean> executorCompletionService= new ExecutorCompletionService<>(processingPool);
+                List<Future<Boolean>> processingFutures = new ArrayList<>();
 
                 for (TopicPartition partition : consumerRecords.partitions()) {
                     List<ConsumerRecord<String, JobStatusResult>> partitionRecords = consumerRecords.records(partition);
+                    logger.info("Received job records {}", partitionRecords.size());
+
                     for (ConsumerRecord<String, JobStatusResult> record : partitionRecords) {
-                        boolean success = process(record.value());
-                        logger.info("Status of processing " + record.value().getJobId() + " : " + success);
-                        if (success) {
-                            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
-                        }
+                        logger.info("Submitting {} to process in thread pool", record.value().getJobId());
+
+                        // This avoids kafka read thread to wait until processing is completed before committing
+                        // There is a risk of missing 20 messages in case of a restart but this improves the robustness
+                        // of the kafka read thread by avoiding wait timeouts
+                        processingFutures.add(executorCompletionService.submit(() -> {
+                            boolean success = process(record.value());
+                            logger.info("Status of processing " + record.value().getJobId() + " : " + success);
+                            return success;
+                        }));
+
+                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                     }
                 }
 
-                consumerRecords.forEach(record -> process(record.value()));
-
-                consumer.commitAsync();
+                for (Future<Boolean> f: processingFutures) {
+                    try {
+                        executorCompletionService.take().get();
+                    } catch (Exception e) {
+                        logger.error("Failed processing job", e);
+                    }
+                }
+                logger.info("All messages processed. Moving to next round");
             }
         }).start();
     }