You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/16 17:13:57 UTC

[airavata] branch develop updated: Making a singleton producer

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 448ed3c  Making a singleton producer
448ed3c is described below

commit 448ed3c494cf4486627af6a63ca99ea46f31f530
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 16 12:13:46 2019 -0500

    Making a singleton producer
---
 .../impl/task/parsing/ParsingTriggeringTask.java    | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index 0e275a4..dc1f157 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -21,17 +21,20 @@ public class ParsingTriggeringTask extends AiravataTask {
 
     private final static Logger logger = LoggerFactory.getLogger(DataParsingTask.class);
 
-    private Producer<String, ProcessCompletionMessage> producer;
+    private static Producer<String, ProcessCompletionMessage> producer;
 
     private void createProducer() throws ApplicationSettingsException {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                ServerSettings.getSetting("kafka.parsing.broker.url"));
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                StringSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
-        this.producer = new KafkaProducer<String, ProcessCompletionMessage>(props);;
+
+        if (producer == null) {
+            Properties props = new Properties();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                    ServerSettings.getSetting("kafka.parsing.broker.url"));
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                    StringSerializer.class.getName());
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
+            producer = new KafkaProducer<String, ProcessCompletionMessage>(props);
+        }
     }
 
     public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage) throws ExecutionException, InterruptedException, ApplicationSettingsException {