You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/11/09 14:51:28 UTC
[airavata] branch develop updated: Attaching ParserTriggering Task
to the post workflow to integrate with parser framework
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new b2c00c9 Attaching ParserTriggering Task to the post workflow to integrate with parser framework
b2c00c9 is described below
commit b2c00c92320f5302751e83eca6a0e6d1b9a790f6
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Fri Nov 9 09:51:19 2018 -0500
Attaching ParserTriggering Task to the post workflow to integrate with parser framework
---
.../helix/impl/participant/GlobalParticipant.java | 4 +-
.../impl/task/parsing/ParsingTriggeringTask.java | 69 ++++++++++++++++++++++
.../ProcessCompletionMessage.java | 2 +-
.../ProcessCompletionMessageDeserializer.java | 4 +-
.../kafka/ProcessCompletionMessageSerializer.java | 4 +-
.../helix/impl/workflow/ParserWorkflowManager.java | 8 +--
.../helix/impl/workflow/PostWorkflowManager.java | 12 ++++
7 files changed, 93 insertions(+), 10 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index a970d1e..a36e092 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -44,7 +44,9 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> {
"org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask",
"org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask",
"org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask",
- "org.apache.airavata.helix.impl.task.parsing.DataParsingTask"
+ "org.apache.airavata.helix.impl.task.parsing.DataParsingTask",
+ "org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask"
+
};
@SuppressWarnings("WeakerAccess")
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
new file mode 100644
index 0000000..ca7d5c2
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -0,0 +1,69 @@
+package org.apache.airavata.helix.impl.task.parsing;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.airavata.helix.impl.task.parsing.kafka.ProcessCompletionMessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+@TaskDef(name = "Parsing Triggering Task")
+public class ParsingTriggeringTask extends AiravataTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(DataParsingTask.class);
+
+ private Producer<String, ProcessCompletionMessage> producer;
+
+ private void createProducer() throws ApplicationSettingsException {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ ServerSettings.getSetting("kafka.parsing.broker.url"));
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName());
+ this.producer = producer;
+ }
+
+ public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage) throws ExecutionException, InterruptedException, ApplicationSettingsException {
+ final ProducerRecord<String, ProcessCompletionMessage> record = new ProducerRecord<>(
+ ServerSettings.getSetting("kafka.parser.topic"),
+ completionMessage.getExperimentId(),
+ completionMessage);
+ RecordMetadata recordMetadata = producer.send(record).get();
+ producer.flush();
+ }
+
+ @Override
+ public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+
+ logger.info("Starting parsing triggerring task " + getTaskId() + ", experiment id " + getExperimentId());
+
+ ProcessCompletionMessage completionMessage = new ProcessCompletionMessage();
+ completionMessage.setExperimentId(getExperimentId());
+ completionMessage.setProcessId(getProcessId());
+ completionMessage.setGatewayId(getGatewayId());
+
+ try {
+ createProducer();
+ submitMessageToParserEngine(completionMessage);
+ } catch (Exception e) {
+ logger.error("Failed to submit completion message to parsing engine", e);
+ }
+ return onSuccess("Successfully completed parsing triggering task");
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
similarity index 91%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
index e396846..f53e33c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/ProcessCompletionMessage.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/ProcessCompletionMessage.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.helix.impl.task.completing;
+package org.apache.airavata.helix.impl.task.parsing;
public class ProcessCompletionMessage {
private String processId;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
similarity index 83%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
index 9a2a938..68f0872 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageDeserializer.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageDeserializer.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.impl.task.completing.kafka;
+package org.apache.airavata.helix.impl.task.parsing.kafka;
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
similarity index 79%
rename from modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java
rename to modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
index 164ad95..c64a38e 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/completing/kafka/ProcessCompletionMessageSerializer.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/kafka/ProcessCompletionMessageSerializer.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.impl.task.completing.kafka;
+package org.apache.airavata.helix.impl.task.parsing.kafka;
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index b7db7f8..079ad07 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -23,8 +23,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.completing.ProcessCompletionMessage;
-import org.apache.airavata.helix.impl.task.completing.kafka.ProcessCompletionMessageDeserializer;
+import org.apache.airavata.helix.impl.task.parsing.ProcessCompletionMessage;
+import org.apache.airavata.helix.impl.task.parsing.kafka.ProcessCompletionMessageDeserializer;
import org.apache.airavata.helix.impl.task.parsing.*;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInput;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInputs;
@@ -308,8 +308,8 @@ public class ParserWorkflowManager extends WorkflowManager {
final Properties props = new Properties();
final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props);
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url"));
- props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.broker.consumer.group"));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parser.broker.url"));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 3f86db5..78f76c9 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -27,6 +27,7 @@ import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.core.util.MonitoringUtil;
import org.apache.airavata.helix.impl.task.*;
import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
import org.apache.airavata.model.status.ProcessState;
@@ -214,8 +215,19 @@ public class PostWorkflowManager extends WorkflowManager {
if (allTasks.size() > 0) {
allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
}
+
allTasks.add(completingTask);
+ ParsingTriggeringTask parsingTriggeringTask = new ParsingTriggeringTask();
+ parsingTriggeringTask.setTaskId("Parsing-Triggering-Task");
+ parsingTriggeringTask.setGatewayId(experimentModel.getGatewayId());
+ parsingTriggeringTask.setExperimentId(experimentModel.getExperimentId());
+ parsingTriggeringTask.setProcessId(processModel.getProcessId());
+ parsingTriggeringTask.setSkipTaskStatusPublish(true);
+ completingTask.setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
+
+ allTasks.add(parsingTriggeringTask);
+
String workflowName = getWorkflowOperator().launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
try {