You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/05/02 20:32:21 UTC

[airavata] branch master updated: Bringing back publishing process status over message bus

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new b933d96  Bringing back publishing process status over message bus
b933d96 is described below

commit b933d963904e2dc3d56b5db6640b8d4cf4cea3dc
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu May 2 16:32:13 2019 -0400

    Bringing back publishing process status over message bus
---
 .../apache/airavata/helix/impl/task/AiravataTask.java | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 0d5cfe1..b3380c9 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,12 +189,12 @@ public abstract class AiravataTask extends AbstractTask {
                 status.setTimeOfStateChange(status.getTimeOfStateChange());
             }
             getRegistryServiceClient().addProcessStatus(status, getProcessId());
-            /*ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+            ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
             ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
             MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
                     AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);*/
+            getStatusPublisher().publish(msgCtx);
         } catch (Exception e) {
             logger.error("Failed to save process status of process " + getProcessId(), e);
         }
@@ -218,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
 
             getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
 
-            /*JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
+            JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
 
             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), gateway);
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);*/
+            getStatusPublisher().publish(msgCtx);
 
         } catch (Exception e) {
             logger.error("Error persisting job status " + e.getLocalizedMessage(), e);
@@ -298,6 +298,17 @@ public abstract class AiravataTask extends AbstractTask {
         }
     }
 
+    protected Publisher getStatusPublisher() throws AiravataException {
+        if (statusPublisher == null) {
+            synchronized (RabbitMQPublisher.class) {
+                if (statusPublisher == null) {
+                    statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+                }
+            }
+        }
+        return statusPublisher;
+    }
+
     @Override
     public TaskResult onRun(TaskHelper helper) {