You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/16 17:14:36 UTC
[airavata] branch develop updated: Fixing the bug of consumer not
initialization
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 1c6f867 Fixing the bug of consumer not initialization
1c6f867 is described below
commit 1c6f86700479b7e5038386d1807087296ed77934
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 16 12:14:26 2019 -0500
Fixing the bug of consumer not initialization
---
.../airavata/helix/impl/workflow/ParserWorkflowManager.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 88ad888..53d2ad3 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -309,16 +309,20 @@ public class ParserWorkflowManager extends WorkflowManager {
}
private void runConsumer() throws ApplicationSettingsException {
+
final Properties props = new Properties();
- final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parser.broker.url"));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
+
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic")));
+ logger.info("Starting the kafka consumer..");
+
while (true) {
final ConsumerRecords<String, ProcessCompletionMessage> consumerRecords = consumer.poll(Long.MAX_VALUE);