You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/11/09 14:51:28 UTC

[airavata] branch develop updated: Attaching ParserTriggering Task to the post workflow to integrate with parser framework

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new b2c00c9  Attaching ParserTriggering Task to the post workflow to integrate with parser framework
b2c00c9 is described below

commit b2c00c92320f5302751e83eca6a0e6d1b9a790f6
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Fri Nov 9 09:51:19 2018 -0500

    Attaching ParserTriggering Task to the post workflow to integrate with parser framework
---
 .../helix/impl/participant/GlobalParticipant.java  |  4 +-
 .../impl/task/parsing/ParsingTriggeringTask.java   | 69 ++++++++++++++++++++++
 .../ProcessCompletionMessage.java                  |  2 +-
 .../ProcessCompletionMessageDeserializer.java      |  4 +-
 .../kafka/ProcessCompletionMessageSerializer.java  |  4 +-
 .../helix/impl/workflow/ParserWorkflowManager.java |  8 +--
 .../helix/impl/workflow/PostWorkflowManager.java   | 12 ++++
 7 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index a970d1e..a36e092 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -44,7 +44,9 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> {
             "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask",
             "org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask",
             "org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask",
-            "org.apache.airavata.helix.impl.task.parsing.DataParsingTask"
+            "org.apache.airavata.helix.impl.task.parsing.DataParsingTask",
+            "org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask"
+
     };
 
     @SuppressWarnings("WeakerAccess")
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
new file mode 100644
index 0000000..ca7d5c2
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -0,0 +1,69 @@
+package org.apache.airavata.helix.impl.task.parsing;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.airavata.helix.impl.task.parsing.kafka.ProcessCompletionMessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+@TaskDef(name = "Parsing Triggering Task")
+public class ParsingTriggeringTask extends AiravataTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(DataParsingTask.class);
+
+    private Producer<String, ProcessCompletionMessage> producer;
+
+    private void createProducer() throws ApplicationSettingsException {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                ServerSettings.getSetting("kafka.parsing.broker.url"));
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
+        this.producer = producer;
+    }
+
+    public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage) throws ExecutionException, InterruptedException, ApplicationSettingsException {
+        final ProducerRecord<String, ProcessCompletionMessage> record = new ProducerRecord<>(
+                ServerSettings.getSetting("kafka.parser.topic"),
+                completionMessage.getExperimentId(),
+                completionMessage);
+        RecordMetadata recordMetadata = producer.send(record).get();
+        producer.flush();
+    }
+
+    @Override
+    public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+
+        logger.info("Starting parsing triggerring task " + getTaskId() + ", experiment id " + getExperimentId());
+
+        ProcessCompletionMessage completionMessage = new ProcessCompletionMessage();
+        completionMessage.setExperimentId(getExperimentId());
+        completionMessage.setProcessId(getProcessId());
+        completionMessage.setGatewayId(getGatewayId());
+
+        try {
+            createProducer();
+            submitMessageToParserEngine(completionMessage);
+        } catch (Exception e) {
+            logger.error("Failed to submit completion message to parsing engine", e);
+        }
+        return onSuccess("Successfully completed parsing triggering task");
+    }
+
+    @Override
+    public void onCancel(TaskContext taskContext) {
+
+    }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
similarity index 91%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
index e396846..f53e33c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.helix.impl.task.completing;
+package org.apache.airavata.helix.impl.task.parsing;
 
 public class ProcessCompletionMessage {
     private String processId;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
similarity index 83%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
index 9a2a938..68f0872 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.impl.task.completing.kafka;
+package org.apache.airavata.helix.impl.task.parsing.kafka;
 
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Map;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
similarity index 79%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
index 164ad95..c64a38e 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.impl.task.completing.kafka;
+package org.apache.airavata.helix.impl.task.parsing.kafka;
 
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.Map;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index b7db7f8..079ad07 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -23,8 +23,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
-import org.apache.airavata.helix.impl.task.completing.kafka.ProcessCompletionMessageDeserializer;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.kafka.ProcessCompletionMessageDeserializer;
 import org.apache.airavata.helix.impl.task.parsing.*;
 import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInput;
 import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInputs;
@@ -308,8 +308,8 @@ public class ParserWorkflowManager extends WorkflowManager {
         final Properties props = new Properties();
         final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.broker.consumer.group"));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parser.broker.url"));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 3f86db5..78f76c9 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -27,6 +27,7 @@ import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.core.util.MonitoringUtil;
 import org.apache.airavata.helix.impl.task.*;
 import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
 import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
 import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
 import org.apache.airavata.model.status.ProcessState;
@@ -214,8 +215,19 @@ public class PostWorkflowManager extends WorkflowManager {
                         if (allTasks.size() > 0) {
                             allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
                         }
+
                         allTasks.add(completingTask);
 
+                        ParsingTriggeringTask parsingTriggeringTask = new ParsingTriggeringTask();
+                        parsingTriggeringTask.setTaskId("Parsing-Triggering-Task");
+                        parsingTriggeringTask.setGatewayId(experimentModel.getGatewayId());
+                        parsingTriggeringTask.setExperimentId(experimentModel.getExperimentId());
+                        parsingTriggeringTask.setProcessId(processModel.getProcessId());
+                        parsingTriggeringTask.setSkipTaskStatusPublish(true);
+                        completingTask.setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
+
+                        allTasks.add(parsingTriggeringTask);
+
                         String workflowName = getWorkflowOperator().launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
                                 new ArrayList<>(allTasks), true, false);
                         try {