You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/16 17:14:36 UTC

[airavata] branch develop updated: Fixing the bug of consumer not initialization

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1c6f867  Fixing the bug of consumer not initialization
1c6f867 is described below

commit 1c6f86700479b7e5038386d1807087296ed77934
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 16 12:14:26 2019 -0500

    Fixing the bug of consumer not initialization
---
 .../airavata/helix/impl/workflow/ParserWorkflowManager.java       | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 88ad888..53d2ad3 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -309,16 +309,20 @@ public class ParserWorkflowManager extends WorkflowManager {
     }
 
     private void runConsumer() throws ApplicationSettingsException {
+
         final Properties props = new Properties();
-        final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parser.broker.url"));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url"));
         props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
+
         consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic")));
 
+        logger.info("Starting the kafka consumer..");
+
         while (true) {
             final ConsumerRecords<String, ProcessCompletionMessage> consumerRecords = consumer.poll(Long.MAX_VALUE);