You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:07 UTC

[airavata] 10/17: Improving status publishing

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 1c3a5d4eca7a3fd455adea1ce56437ec21499bd7
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Mar 5 08:46:53 2018 -0500

    Improving status publishing
---
 .../helix/impl/participant/GlobalParticipant.java  |   1 +
 .../airavata/helix/impl/task/AiravataTask.java     | 136 ++++++++++++++++++---
 .../airavata/helix/impl/task/CompletingTask.java   |  26 ++++
 .../airavata/helix/impl/task/EnvSetupTask.java     |   3 +
 .../helix/impl/task/InputDataStagingTask.java      |   7 +-
 .../helix/impl/task/OutputDataStagingTask.java     |   3 +
 .../airavata/helix/impl/task/TaskContext.java      |  16 +++
 .../submission/task/DefaultJobSubmissionTask.java  |  15 +--
 .../submission/task/ForkJobSubmissionTask.java     |   2 +-
 .../task/submission/task/JobSubmissionTask.java    |  43 +------
 .../submission/task/LocalJobSubmissionTask.java    |   4 +-
 .../helix/impl/workflow/PostWorkflowManager.java   |  88 +++++++++++--
 .../job/monitor/kafka/MessageProducer.java         |   1 +
 13 files changed, 266 insertions(+), 79 deletions(-)

diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index 984b277..fc3fbcb 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -18,6 +18,7 @@ public class GlobalParticipant extends HelixParticipant {
         "org.apache.airavata.helix.impl.task.EnvSetupTask",
         "org.apache.airavata.helix.impl.task.InputDataStagingTask",
         "org.apache.airavata.helix.impl.task.OutputDataStagingTask",
+        "org.apache.airavata.helix.impl.task.CompletingTask",
         "org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask",
         "org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask"
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index e15195d..03dedf3 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -1,25 +1,21 @@
 package org.apache.airavata.helix.impl.task;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.helix.HelixManager;
@@ -27,7 +23,8 @@ import org.apache.helix.task.TaskResult;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.util.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 
 public abstract class AiravataTask extends AbstractTask {
 
@@ -61,17 +58,128 @@ public abstract class AiravataTask extends AbstractTask {
     }
 
     protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+
         String errorMessage;
+        ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
+        StringWriter errors = new StringWriter();
 
         if (error == null) {
             errorMessage = "Task " + getTaskId() + " failed due to " + reason;
+            errors.write(errorMessage);
+            status.setReason(errorMessage);
             logger.error(errorMessage);
+
         } else {
             errorMessage = "Task " + getTaskId() + " failed due to " + reason + ", " + error.getMessage();
+            status.setReason(errorMessage);
+            error.printStackTrace(new PrintWriter(errors));
             logger.error(errorMessage, error);
         }
+        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        getTaskContext().setProcessStatus(status);
+
+        ErrorModel errorModel = new ErrorModel();
+        errorModel.setUserFriendlyMessage("GFac Worker throws an exception");
+        errorModel.setActualErrorMessage(errors.toString());
+        errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+
+        saveAndPublishProcessStatus();
+        saveExperimentError(errorModel);
+        saveProcessError(errorModel);
         return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+    }
 
+    public void saveAndPublishProcessStatus(ProcessState state) {
+        ProcessStatus processStatus = new ProcessStatus(state);
+        processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        getTaskContext().setProcessStatus(processStatus);
+        saveAndPublishProcessStatus();
+    }
+
+    public void saveAndPublishProcessStatus() {
+        try {
+            ProcessStatus status = taskContext.getProcessStatus();
+            if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            }else {
+                status.setTimeOfStateChange(status.getTimeOfStateChange());
+            }
+            experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, getProcessId());
+            ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+            MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            logger.error("Failed to save process status of process " + getProcessId(), e);
+        }
+    }
+
+    public void saveAndPublishTaskStatus() {
+        try {
+            TaskState state = getTaskContext().getTaskState();
+            // first we save job jobModel to the registry for sa and then save the job status.
+            TaskStatus status = getTaskContext().getTaskStatus();
+            if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            }else {
+                status.setTimeOfStateChange(status.getTimeOfStateChange());
+            }
+            experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, getTaskId());
+            TaskIdentifier identifier = new TaskIdentifier(getTaskId(), getProcessId(), getExperimentId(), getGatewayId());
+            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
+                    identifier);
+            MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+                    (MessageType.TASK.name()), getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            logger.error("Failed to publist task status of task " + getTaskId());
+        }
+    }
+
+    public void saveExperimentError(ErrorModel errorModel) {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+            getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment errors";
+            logger.error(msg, e);
+        }
+    }
+
+    public void saveProcessError(ErrorModel errorModel) {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+            experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+                    + " : - Error while updating process errors";
+            logger.error(msg, e);
+        }
+    }
+
+    public void saveTaskError(ErrorModel errorModel) throws Exception {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+            getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
+                    + " : - Error while updating task errors";
+            throw new Exception(msg, e);
+        }
+    }
+
+    public Publisher getStatusPublisher() throws AiravataException {
+        if (statusPublisher == null) {
+            synchronized (RabbitMQPublisher.class) {
+                if (statusPublisher == null) {
+                    statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+                }
+            }
+        }
+        return statusPublisher;
     }
 
     @Override
@@ -145,10 +253,6 @@ public abstract class AiravataTask extends AbstractTask {
         return experimentCatalog;
     }
 
-    public Publisher getStatusPublisher() {
-        return statusPublisher;
-    }
-
     public String getProcessId() {
         return processId;
     }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java
new file mode 100644
index 0000000..9ec2909
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java
@@ -0,0 +1,26 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Completing Task")
+public class CompletingTask extends AiravataTask {
+
+    private static final Logger logger = LogManager.getLogger(CompletingTask.class);
+
+    @Override
+    public TaskResult onRun(TaskHelper helper) {
+        logger.info("Process " + getProcessId() + " successfully completed");
+        saveAndPublishProcessStatus(ProcessState.COMPLETED);
+        return onSuccess("Process " + getProcessId() + " successfully completed");
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index ddba5f2..abdc1bf 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.helix.task.TaskResult;
@@ -17,6 +18,8 @@ public class EnvSetupTask extends AiravataTask {
     @Override
     public TaskResult onRun(TaskHelper taskHelper) {
         try {
+
+            saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
             publishTaskState(TaskState.EXECUTING);
             AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
                     getTaskContext().getGatewayId(),
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
index 30eeec0..ed143dd 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
@@ -3,20 +3,17 @@ package org.apache.airavata.helix.impl.task;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.task.TaskResult;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
@@ -29,6 +26,8 @@ public class InputDataStagingTask extends DataStagingTask {
     public TaskResult onRun(TaskHelper taskHelper) {
         logger.info("Starting Input Data Staging Task " + getTaskId());
 
+        saveAndPublishProcessStatus(ProcessState.INPUT_DATA_STAGING);
+
         try {
             // Get and validate data staging task model
             DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
index f33523c..ff8fd2e 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
@@ -7,6 +7,7 @@ import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.registry.cpi.ExpCatChildDataType;
 import org.apache.airavata.registry.cpi.RegistryException;
@@ -29,6 +30,8 @@ public class OutputDataStagingTask extends DataStagingTask {
     public TaskResult onRun(TaskHelper taskHelper) {
 
         logger.info("Starting output data staging task " + getTaskId());
+        saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+
         try {
             // Get and validate data staging task model
             DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 489a196..6be1d36 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -21,6 +21,8 @@ import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
@@ -385,6 +387,20 @@ public class TaskContext {
             return null;
     }
 
+    public TaskState getTaskState() {
+        if(getCurrentTaskModel().getTaskStatuses() != null)
+            return getCurrentTaskModel().getTaskStatuses().get(0).getState();
+        else
+            return null;
+    }
+
+    public TaskStatus getTaskStatus() {
+        if(getCurrentTaskModel().getTaskStatuses() != null)
+            return getCurrentTaskModel().getTaskStatuses().get(0);
+        else
+            return null;
+    }
+
     public String getComputeResourceId() {
         if (isUseUserCRPref() &&
                 userComputeResourcePreference != null &&
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index 31b6f30..688f894 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -5,24 +5,18 @@ import org.apache.airavata.agents.api.JobSubmissionOutput;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder;
 import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
-import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
 import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.task.TaskResult;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -38,6 +32,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
     public TaskResult onRun(TaskHelper taskHelper) {
 
         try {
+            saveAndPublishProcessStatus(ProcessState.EXECUTING);
 
             GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
 
@@ -133,14 +128,14 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                     jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatuses(Arrays.asList(jobStatus));
-                    saveJobStatus(jobModel);
+                    saveAndPublishJobStatus(jobModel);
 
                     if (verifyJobSubmissionByJobId(adaptor, jobId)) {
                         jobStatus.setJobState(JobState.QUEUED);
                         jobStatus.setReason("Verification step succeeded");
                         jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                         jobModel.setJobStatuses(Arrays.asList(jobStatus));
-                        saveJobStatus(jobModel);
+                        saveAndPublishJobStatus(jobModel);
                         createMonitoringNode(jobId);
                     }
 
@@ -172,7 +167,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                             jobStatus.setReason("Verification step succeeded");
                             jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                             jobModel.setJobStatuses(Arrays.asList(jobStatus));
-                            saveJobStatus(jobModel);
+                            saveAndPublishJobStatus(jobModel);
                             //taskStatus.setState(TaskState.COMPLETED);
                             //taskStatus.setReason("Submitted job to compute resource");
                             //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
index 2e4a052..e3b5447 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -58,7 +58,7 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
                     jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatuses(Arrays.asList(jobStatus));
-                    saveJobStatus(jobModel);
+                    saveAndPublishJobStatus(jobModel);
 
                     return null;
                 } else {
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index afa2630..4fed22d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -73,6 +73,8 @@ public abstract class JobSubmissionTask extends AiravataTask {
         this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]);
         this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
         this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes());
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/task", getTaskId().getBytes());
+        this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
         this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes());
     }
 
@@ -146,48 +148,11 @@ public abstract class JobSubmissionTask extends AiravataTask {
         return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
     }
 
-    ////////////////////////////////
-
-
-    /////////////////////////////////////////////
-    public void saveExperimentError(ErrorModel errorModel) throws Exception {
-        try {
-            errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
-            getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, getExperimentId());
-        } catch (RegistryException e) {
-            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
-                    + " : - Error while updating experiment errors";
-            throw new Exception(msg, e);
-        }
-    }
-
-    public void saveProcessError(ErrorModel errorModel) throws Exception {
-        try {
-            errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
-            getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
-        } catch (RegistryException e) {
-            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
-                    + " : - Error while updating process errors";
-            throw new Exception(msg, e);
-        }
-    }
-
-    public void saveTaskError(ErrorModel errorModel) throws Exception {
-        try {
-            errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
-            getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
-        } catch (RegistryException e) {
-            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
-                    + " : - Error while updating task errors";
-            throw new Exception(msg, e);
-        }
-    }
-
     public void saveJobModel(JobModel jobModel) throws RegistryException {
         getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId());
     }
 
-    public void saveJobStatus(JobModel jobModel) throws Exception {
+    public void saveAndPublishJobStatus(JobModel jobModel) throws Exception {
         try {
             // first we save job jobModel to the registry for sa and then save the job status.
             JobStatus jobStatus = null;
@@ -213,7 +178,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            //getStatusPublisher().publish(msgCtx);
+            getStatusPublisher().publish(msgCtx);
         } catch (Exception e) {
             throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
         }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
index e3ae4fa..cea6750 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -58,7 +58,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
                 jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 jobModel.setJobStatuses(Arrays.asList(jobStatus));
 
-                saveJobStatus(jobModel);
+                saveAndPublishJobStatus(jobModel);
 
                 jobModel.setExitCode(submissionOutput.getExitCode());
                 jobModel.setStdErr(submissionOutput.getStdErr());
@@ -69,7 +69,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
                 jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 jobModel.setJobStatuses(Arrays.asList(jobStatus));
 
-                saveJobStatus(jobModel);
+                saveAndPublishJobStatus(jobModel);
 
                 return null;
             }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 383fe37..07a9aee 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -1,26 +1,33 @@
 package org.apache.airavata.helix.impl.workflow;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.AiravataTask;
-import org.apache.airavata.helix.impl.task.EnvSetupTask;
-import org.apache.airavata.helix.impl.task.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.*;
 import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
 import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
 import org.apache.airavata.helix.workflow.WorkflowManager;
 import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer;
 import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.*;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -46,6 +53,7 @@ public class PostWorkflowManager {
     private final String TOPIC = "parsed-data";
 
     private CuratorFramework curatorClient = null;
+    private Publisher statusPublisher;
 
     private void init() throws ApplicationSettingsException {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -66,6 +74,18 @@ public class PostWorkflowManager {
         return consumer;
     }
 
+    private String getExperimentIdByJobId(String jobId) throws Exception {
+        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment");
+        String process = new String(processBytes);
+        return process;
+    }
+
+    private String getTaskIdByJobId(String jobId) throws Exception {
+        byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task");
+        String process = new String(processBytes);
+        return process;
+    }
+
     private String getProcessIdByJobId(String jobId) throws Exception {
         byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
         String process = new String(processBytes);
@@ -101,6 +121,8 @@ public class PostWorkflowManager {
             if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
                 String gateway = getGatewayByJobId(jobStatusResult.getJobId());
                 String processId = getProcessIdByJobId(jobStatusResult.getJobId());
+                String experimentId = getExperimentIdByJobId(jobStatusResult.getJobId());
+                String task = getTaskIdByJobId(jobStatusResult.getJobId());
                 String status = getStatusByJobId(jobStatusResult.getJobId());
 
                 logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
@@ -111,6 +133,8 @@ public class PostWorkflowManager {
 
                 } else {
 
+                    saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
+
                     if (jobStatusResult.getState() == JobState.COMPLETE) {
                         logger.info("Job " + jobStatusResult.getJobId() + " was completed");
 
@@ -151,6 +175,14 @@ public class PostWorkflowManager {
                                 }
                             }
                         }
+
+                        CompletingTask completingTask = new CompletingTask();
+                        completingTask.setGatewayId(experimentModel.getGatewayId());
+                        completingTask.setExperimentId(experimentModel.getExperimentId());
+                        completingTask.setProcessId(processModel.getProcessId());
+                        completingTask.setTaskId("Completing-Task");
+                        allTasks.add(completingTask);
+
                         WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
                                 "wm-23", ServerSettings.getZookeeperConnection());
 
@@ -189,6 +221,48 @@ public class PostWorkflowManager {
         }
     }
 
+    public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
+                                        JobState jobState) throws Exception {
+        try {
+
+            JobStatus jobStatus = new JobStatus();
+            jobStatus.setReason(jobState.name());
+            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            jobStatus.setJobState(jobState);
+
+            if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) {
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            } else {
+                jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+            }
+
+            CompositeIdentifier ids = new CompositeIdentifier(taskId, jobId);
+            ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+            experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+            JobIdentifier identifier = new JobIdentifier(jobId, taskId,
+                    processId, experimentId, gateway);
+
+            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+            MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+                    (MessageType.JOB.name()), gateway);
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
+        }
+    }
+
+    public Publisher getStatusPublisher() throws AiravataException {
+        if (statusPublisher == null) {
+            synchronized (RabbitMQPublisher.class) {
+                if (statusPublisher == null) {
+                    statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+                }
+            }
+        }
+        return statusPublisher;
+    }
+
     public static void main(String[] args) throws Exception {
 
         PostWorkflowManager postManager = new PostWorkflowManager();
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
index 748a533..9f6d7b8 100644
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
@@ -32,5 +32,6 @@ public class MessageProducer {
     public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
         final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
         RecordMetadata recordMetadata = producer.send(record).get();
+        producer.flush();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.