You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/16 17:13:57 UTC
[airavata] branch develop updated: Making a singleton producer
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 448ed3c Making a singleton producer
448ed3c is described below
commit 448ed3c494cf4486627af6a63ca99ea46f31f530
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 16 12:13:46 2019 -0500
Making a singleton producer
---
.../impl/task/parsing/ParsingTriggeringTask.java | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index 0e275a4..dc1f157 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -21,17 +21,20 @@ public class ParsingTriggeringTask extends AiravataTask {
private final static Logger logger = LoggerFactory.getLogger(DataParsingTask.class);
- private Producer<String, ProcessCompletionMessage> producer;
+ private static Producer<String, ProcessCompletionMessage> producer;
private void createProducer() throws ApplicationSettingsException {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- ServerSettings.getSetting("kafka.parsing.broker.url"));
- props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
- this.producer = new KafkaProducer<String, ProcessCompletionMessage>(props);;
+
+ if (producer == null) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ ServerSettings.getSetting("kafka.parsing.broker.url"));
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
+ producer = new KafkaProducer<String, ProcessCompletionMessage>(props);
+ }
}
public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage) throws ExecutionException, InterruptedException, ApplicationSettingsException {