You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/21 23:54:31 UTC
[airavata] 02/02: Fixing possible kafka read thread timeouts when
processing post wm job status messages
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 9bebfa092d4497129422635a4a7ad4362129a1c2
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 21 19:54:12 2020 -0400
Fixing possible kafka read thread timeouts when processing post wm job status messages
---
.../helix/impl/workflow/PostWorkflowManager.java | 36 +++++++++++++++++-----
1 file changed, 28 insertions(+), 8 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 43c7442..f3b9dea 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -55,12 +55,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
public class PostWorkflowManager extends WorkflowManager {
private final static Logger logger = LoggerFactory.getLogger(PostWorkflowManager.class);
+ private ExecutorService processingPool = Executors.newFixedThreadPool(10);
+
public PostWorkflowManager() throws ApplicationSettingsException {
super(ServerSettings.getSetting("post.workflow.manager.name"),
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
@@ -303,22 +306,39 @@ public class PostWorkflowManager extends WorkflowManager {
new Thread(() -> {
while (true) {
+
final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Long.MAX_VALUE);
+ CompletionService<Boolean> executorCompletionService= new ExecutorCompletionService<>(processingPool);
+ List<Future<Boolean>> processingFutures = new ArrayList<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, JobStatusResult>> partitionRecords = consumerRecords.records(partition);
+ logger.info("Received job records {}", partitionRecords.size());
+
for (ConsumerRecord<String, JobStatusResult> record : partitionRecords) {
- boolean success = process(record.value());
- logger.info("Status of processing " + record.value().getJobId() + " : " + success);
- if (success) {
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
- }
+ logger.info("Submitting {} to process in thread pool", record.value().getJobId());
+
+ // This avoids kafka read thread to wait until processing is completed before committing
+ // There is a risk of missing 20 messages in case of a restart but this improves the robustness
+ // of the kafka read thread by avoiding wait timeouts
+ processingFutures.add(executorCompletionService.submit(() -> {
+ boolean success = process(record.value());
+ logger.info("Status of processing " + record.value().getJobId() + " : " + success);
+ return success;
+ }));
+
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
}
}
- consumerRecords.forEach(record -> process(record.value()));
-
- consumer.commitAsync();
+ for (Future<Boolean> f: processingFutures) {
+ try {
+ executorCompletionService.take().get();
+ } catch (Exception e) {
+ logger.error("Failed processing job", e);
+ }
+ }
+ logger.info("All messages processed. Moving to next round");
}
}).start();
}