You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/05/02 20:32:21 UTC
[airavata] branch master updated: Bringing back publishing process
status over message bus
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new b933d96 Bringing back publishing process status over message bus
b933d96 is described below
commit b933d963904e2dc3d56b5db6640b8d4cf4cea3dc
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu May 2 16:32:13 2019 -0400
Bringing back publishing process status over message bus
---
.../apache/airavata/helix/impl/task/AiravataTask.java | 19 +++++++++++++++----
1 file changed, 15 insertions(+), 4 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 0d5cfe1..b3380c9 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,12 +189,12 @@ public abstract class AiravataTask extends AbstractTask {
status.setTimeOfStateChange(status.getTimeOfStateChange());
}
getRegistryServiceClient().addProcessStatus(status, getProcessId());
- /*ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+ ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);*/
+ getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
logger.error("Failed to save process status of process " + getProcessId(), e);
}
@@ -218,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
- /*JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
+ JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), gateway);
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);*/
+ getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
logger.error("Error persisting job status " + e.getLocalizedMessage(), e);
@@ -298,6 +298,17 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ protected Publisher getStatusPublisher() throws AiravataException {
+ if (statusPublisher == null) {
+ synchronized (RabbitMQPublisher.class) {
+ if (statusPublisher == null) {
+ statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+ }
+ }
+ }
+ return statusPublisher;
+ }
+
@Override
public TaskResult onRun(TaskHelper helper) {