You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/04/06 17:21:27 UTC
[airavata] branch staging updated: Removing redundant status
publish mechanism over queue
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 76a5540 Removing redundant status publish mechanism over queue
76a5540 is described below
commit 76a55405d9b622a0e3ffd8cc8d550b6c72069868
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Apr 6 13:21:17 2019 -0400
Removing redundant status publish mechanism over queue
---
.../airavata/helix/impl/task/AiravataTask.java | 48 +++-------------------
.../airavata/helix/impl/task/TaskContext.java | 19 ---------
.../impl/task/submission/JobSubmissionTask.java | 4 +-
3 files changed, 8 insertions(+), 63 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 4c8adad..0d5cfe1 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,41 +189,17 @@ public abstract class AiravataTask extends AbstractTask {
status.setTimeOfStateChange(status.getTimeOfStateChange());
}
getRegistryServiceClient().addProcessStatus(status, getProcessId());
- ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+ /*ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ getStatusPublisher().publish(msgCtx);*/
} catch (Exception e) {
logger.error("Failed to save process status of process " + getProcessId(), e);
}
}
- @SuppressWarnings("WeakerAccess")
- protected void saveAndPublishTaskStatus() {
- try {
- TaskState state = getTaskContext().getTaskState();
- // first we save job jobModel to the registry for sa and then save the job status.
- TaskStatus status = getTaskContext().getTaskStatus();
- if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- }else {
- status.setTimeOfStateChange(status.getTimeOfStateChange());
- }
- getRegistryServiceClient().addTaskStatus(status, getTaskId());
- TaskIdentifier identifier = new TaskIdentifier(getTaskId(), getProcessId(), getExperimentId(), getGatewayId());
- TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
- identifier);
- MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
- (MessageType.TASK.name()), getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
- } catch (Exception e) {
- logger.error("Failed to publist task status of task " + getTaskId());
- }
- }
-
public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
JobState jobState) throws Exception {
@@ -242,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
- JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
+ /*JobIdentifier identifier = new JobIdentifier(jobId, taskId, processId, experimentId, gateway);
JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), gateway);
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ getStatusPublisher().publish(msgCtx);*/
} catch (Exception e) {
logger.error("Error persisting job status " + e.getLocalizedMessage(), e);
@@ -322,17 +298,6 @@ public abstract class AiravataTask extends AbstractTask {
}
}
- protected Publisher getStatusPublisher() throws AiravataException {
- if (statusPublisher == null) {
- synchronized (RabbitMQPublisher.class) {
- if (statusPublisher == null) {
- statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
- }
- }
- }
- return statusPublisher;
- }
-
@Override
public TaskResult onRun(TaskHelper helper) {
@@ -419,7 +384,6 @@ public abstract class AiravataTask extends AbstractTask {
TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId())
.setRegistryClient(getRegistryServiceClient())
.setProcessModel(getProcessModel())
- .setStatusPublisher(getStatusPublisher())
.setGatewayResourceProfile(getRegistryServiceClient().getGatewayResourceProfile(gatewayId))
.setGatewayComputeResourcePreference(
getRegistryServiceClient().getGatewayComputeResourcePreference(gatewayId,
@@ -445,14 +409,14 @@ public abstract class AiravataTask extends AbstractTask {
taskStatus.setState(ts);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
getRegistryServiceClient().addTaskStatus(taskStatus, getTaskId());
- TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
+ /*TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
getProcessId(), getExperimentId(), getGatewayId());
TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
identifier);
MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
(MessageType.TASK.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- statusPublisher.publish(msgCtx);
+ statusPublisher.publish(msgCtx);*/
} catch (Exception e) {
logger.error("Failed to publish task status " + (ts != null ? ts.name(): "null") +" of task " + getTaskId());
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 3192a59..6c4a1bb 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -59,7 +59,6 @@ public class TaskContext {
private final static Logger logger = LoggerFactory.getLogger(TaskContext.class);
// process model
- private Publisher statusPublisher;
private final String processId;
private final String gatewayId;
//private final String tokenId;
@@ -127,14 +126,6 @@ public class TaskContext {
return taskId;
}
- public Publisher getStatusPublisher() {
- return statusPublisher;
- }
-
- public void setStatusPublisher(Publisher statusPublisher) {
- this.statusPublisher = statusPublisher;
- }
-
public ProcessModel getProcessModel() {
return processModel;
}
@@ -695,7 +686,6 @@ public class TaskContext {
private final String gatewayId;
private final String taskId;
private RegistryService.Client registryClient;
- private Publisher statusPublisher;
private GatewayResourceProfile gatewayResourceProfile;
private ComputeResourcePreference gatewayComputeResourcePreference;
private StoragePreference gatewayStorageResourcePreference;
@@ -736,11 +726,6 @@ public class TaskContext {
return this;
}
- public TaskContextBuilder setStatusPublisher(Publisher statusPublisher) {
- this.statusPublisher = statusPublisher;
- return this;
- }
-
public TaskContext build() throws Exception {
if (notValid(gatewayResourceProfile)) {
throwError("Invalid GatewayResourceProfile");
@@ -757,13 +742,9 @@ public class TaskContext {
if (notValid(registryClient)) {
throwError("Invalid Registry Client");
}
- if (notValid(statusPublisher)) {
- throwError("Invalid Status Publisher");
- }
TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
ctx.setRegistryClient(registryClient);
- ctx.setStatusPublisher(statusPublisher);
ctx.setProcessModel(processModel);
ctx.setGatewayResourceProfile(gatewayResourceProfile);
ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 0e085c3..5f9bda2 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -229,14 +229,14 @@ public abstract class JobSubmissionTask extends AiravataTask {
}
getRegistryServiceClient().addJobStatus(jobStatus, jobModel.getTaskId(), jobModel.getJobId());
- JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+ /*JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
getProcessId(), getProcessModel().getExperimentId(), getGatewayId());
JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ getStatusPublisher().publish(msgCtx);*/
} catch (Exception e) {
throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
}