You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/04/06 17:21:27 UTC

[airavata] branch staging updated: Removing redundant status publish mechanism over queue

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 76a5540  Removing redundant status publish mechanism over queue
76a5540 is described below

commit 76a55405d9b622a0e3ffd8cc8d550b6c72069868
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Apr 6 13:21:17 2019 -0400

    Removing redundant status publish mechanism over queue
---
 .../airavata/helix/impl/task/AiravataTask.java     | 48 +++-------------------
 .../airavata/helix/impl/task/TaskContext.java      | 19 ---------
 .../impl/task/submission/JobSubmissionTask.java    |  4 +-
 3 files changed, 8 insertions(+), 63 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 4c8adad..0d5cfe1 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,41 +189,17 @@ public abstract class AiravataTask extends AbstractTask {
                 status.setTimeOfStateChange(status.getTimeOfStateChange());
             }
             getRegistryServiceClient().addProcessStatus(status, getProcessId());
-            ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+            /*ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
             ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
             MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
                     AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);
+            getStatusPublisher().publish(msgCtx);*/
         } catch (Exception e) {
             logger.error("Failed to save process status of process " + getProcessId(), e);
         }
     }
 
-    @SuppressWarnings("WeakerAccess")
-    protected void saveAndPublishTaskStatus() {
-        try {
-            TaskState state = getTaskContext().getTaskState();
-            // first we save job jobModel to the registry for sa and then save the job status.
-            TaskStatus status = getTaskContext().getTaskStatus();
-            if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
-                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            }else {
-                status.setTimeOfStateChange(status.getTimeOfStateChange());
-            }
-            getRegistryServiceClient().addTaskStatus(status, getTaskId());
-            TaskIdentifier identifier = new TaskIdentifier(getTaskId(), getProcessId(), getExperimentId(), getGatewayId());
-            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
-                    identifier);
-            MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
-                    (MessageType.TASK.name()), getGatewayId());
-            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);
-        } catch (Exception e) {
-            logger.error("Failed to publist task status of task " + getTaskId());
-        }
-    }
-
 
     public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
                                          JobState jobState) throws Exception {
@@ -242,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
 
             getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
 
-            JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
+            /*JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
 
             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), gateway);
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);
+            getStatusPublisher().publish(msgCtx);*/
 
         } catch (Exception e) {
             logger.error("Error persisting job status " + e.getLocalizedMessage(), e);
@@ -322,17 +298,6 @@ public abstract class AiravataTask extends AbstractTask {
         }
     }
 
-    protected Publisher getStatusPublisher() throws AiravataException {
-        if (statusPublisher == null) {
-            synchronized (RabbitMQPublisher.class) {
-                if (statusPublisher == null) {
-                    statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
-                }
-            }
-        }
-        return statusPublisher;
-    }
-
     @Override
     public TaskResult onRun(TaskHelper helper) {
 
@@ -419,7 +384,6 @@ public abstract class AiravataTask extends AbstractTask {
             TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId())
                     .setRegistryClient(getRegistryServiceClient())
                     .setProcessModel(getProcessModel())
-                    .setStatusPublisher(getStatusPublisher())
                     .setGatewayResourceProfile(getRegistryServiceClient().getGatewayResourceProfile(gatewayId))
                     .setGatewayComputeResourcePreference(
                             getRegistryServiceClient().getGatewayComputeResourcePreference(gatewayId,
@@ -445,14 +409,14 @@ public abstract class AiravataTask extends AbstractTask {
             taskStatus.setState(ts);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             getRegistryServiceClient().addTaskStatus(taskStatus, getTaskId());
-            TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
+            /*TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
                     getProcessId(), getExperimentId(), getGatewayId());
             TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
                     identifier);
             MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
                     (MessageType.TASK.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            statusPublisher.publish(msgCtx);
+            statusPublisher.publish(msgCtx);*/
         } catch (Exception e) {
             logger.error("Failed to publish task status " + (ts != null ? ts.name(): "null") +" of task " + getTaskId());
         }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 3192a59..6c4a1bb 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -59,7 +59,6 @@ public class TaskContext {
 
     private final static Logger logger = LoggerFactory.getLogger(TaskContext.class);
     // process model
-    private Publisher statusPublisher;
     private final String processId;
     private final String gatewayId;
     //private final String tokenId;
@@ -127,14 +126,6 @@ public class TaskContext {
         return taskId;
     }
 
-    public Publisher getStatusPublisher() {
-        return statusPublisher;
-    }
-
-    public void setStatusPublisher(Publisher statusPublisher) {
-        this.statusPublisher = statusPublisher;
-    }
-
     public ProcessModel getProcessModel() {
         return processModel;
     }
@@ -695,7 +686,6 @@ public class TaskContext {
         private final String gatewayId;
         private final String taskId;
         private RegistryService.Client registryClient;
-        private Publisher statusPublisher;
         private GatewayResourceProfile gatewayResourceProfile;
         private ComputeResourcePreference gatewayComputeResourcePreference;
         private StoragePreference gatewayStorageResourcePreference;
@@ -736,11 +726,6 @@ public class TaskContext {
             return this;
         }
 
-        public TaskContextBuilder setStatusPublisher(Publisher statusPublisher) {
-            this.statusPublisher = statusPublisher;
-            return this;
-        }
-
         public TaskContext build() throws Exception {
             if (notValid(gatewayResourceProfile)) {
                 throwError("Invalid GatewayResourceProfile");
@@ -757,13 +742,9 @@ public class TaskContext {
             if (notValid(registryClient)) {
                 throwError("Invalid Registry Client");
             }
-            if (notValid(statusPublisher)) {
-                throwError("Invalid Status Publisher");
-            }
 
             TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
             ctx.setRegistryClient(registryClient);
-            ctx.setStatusPublisher(statusPublisher);
             ctx.setProcessModel(processModel);
             ctx.setGatewayResourceProfile(gatewayResourceProfile);
             ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 0e085c3..5f9bda2 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -229,14 +229,14 @@ public abstract class JobSubmissionTask extends AiravataTask {
             }
 
             getRegistryServiceClient().addJobStatus(jobStatus, jobModel.getTaskId(), jobModel.getJobId());
-            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+            /*JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
                     getProcessId(), getProcessModel().getExperimentId(), getGatewayId());
 
             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);
+            getStatusPublisher().publish(msgCtx);*/
         } catch (Exception e) {
             throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
         }