You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:07 UTC
[airavata] 10/17: Improving status publishing
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 1c3a5d4eca7a3fd455adea1ce56437ec21499bd7
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Mar 5 08:46:53 2018 -0500
Improving status publishing
---
.../helix/impl/participant/GlobalParticipant.java | 1 +
.../airavata/helix/impl/task/AiravataTask.java | 136 ++++++++++++++++++---
.../airavata/helix/impl/task/CompletingTask.java | 26 ++++
.../airavata/helix/impl/task/EnvSetupTask.java | 3 +
.../helix/impl/task/InputDataStagingTask.java | 7 +-
.../helix/impl/task/OutputDataStagingTask.java | 3 +
.../airavata/helix/impl/task/TaskContext.java | 16 +++
.../submission/task/DefaultJobSubmissionTask.java | 15 +--
.../submission/task/ForkJobSubmissionTask.java | 2 +-
.../task/submission/task/JobSubmissionTask.java | 43 +------
.../submission/task/LocalJobSubmissionTask.java | 4 +-
.../helix/impl/workflow/PostWorkflowManager.java | 88 +++++++++++--
.../job/monitor/kafka/MessageProducer.java | 1 +
13 files changed, 266 insertions(+), 79 deletions(-)
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index 984b277..fc3fbcb 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -18,6 +18,7 @@ public class GlobalParticipant extends HelixParticipant {
"org.apache.airavata.helix.impl.task.EnvSetupTask",
"org.apache.airavata.helix.impl.task.InputDataStagingTask",
"org.apache.airavata.helix.impl.task.OutputDataStagingTask",
+ "org.apache.airavata.helix.impl.task.CompletingTask",
"org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask"
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index e15195d..03dedf3 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -1,25 +1,21 @@
package org.apache.airavata.helix.impl.task;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
import org.apache.helix.HelixManager;
@@ -27,7 +23,8 @@ import org.apache.helix.task.TaskResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.util.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
public abstract class AiravataTask extends AbstractTask {
@@ -61,17 +58,128 @@ public abstract class AiravataTask extends AbstractTask {
}
protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+
String errorMessage;
+ ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
+ StringWriter errors = new StringWriter();
if (error == null) {
errorMessage = "Task " + getTaskId() + " failed due to " + reason;
+ errors.write(errorMessage);
+ status.setReason(errorMessage);
logger.error(errorMessage);
+
} else {
errorMessage = "Task " + getTaskId() + " failed due to " + reason + ", " + error.getMessage();
+ status.setReason(errorMessage);
+ error.printStackTrace(new PrintWriter(errors));
logger.error(errorMessage, error);
}
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ getTaskContext().setProcessStatus(status);
+
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage("GFac Worker throws an exception");
+ errorModel.setActualErrorMessage(errors.toString());
+ errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+
+ saveAndPublishProcessStatus();
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+ }
+ public void saveAndPublishProcessStatus(ProcessState state) {
+ ProcessStatus processStatus = new ProcessStatus(state);
+ processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ getTaskContext().setProcessStatus(processStatus);
+ saveAndPublishProcessStatus();
+ }
+
+ public void saveAndPublishProcessStatus() {
+ try {
+ ProcessStatus status = taskContext.getProcessStatus();
+ if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ status.setTimeOfStateChange(status.getTimeOfStateChange());
+ }
+ experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, getProcessId());
+ ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ logger.error("Failed to save process status of process " + getProcessId(), e);
+ }
+ }
+
+ public void saveAndPublishTaskStatus() {
+ try {
+ TaskState state = getTaskContext().getTaskState();
+ // first we save job jobModel to the registry for sa and then save the job status.
+ TaskStatus status = getTaskContext().getTaskStatus();
+ if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ status.setTimeOfStateChange(status.getTimeOfStateChange());
+ }
+ experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, getTaskId());
+ TaskIdentifier identifier = new TaskIdentifier(getTaskId(), getProcessId(), getExperimentId(), getGatewayId());
+ TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
+ identifier);
+ MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+ (MessageType.TASK.name()), getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ logger.error("Failed to publist task status of task " + getTaskId());
+ }
+ }
+
+ public void saveExperimentError(ErrorModel errorModel) {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+ getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment errors";
+ logger.error(msg, e);
+ }
+ }
+
+ public void saveProcessError(ErrorModel errorModel) {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+ experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+ + " : - Error while updating process errors";
+ logger.error(msg, e);
+ }
+ }
+
+ public void saveTaskError(ErrorModel errorModel) throws Exception {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+ getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
+ + " : - Error while updating task errors";
+ throw new Exception(msg, e);
+ }
+ }
+
+ public Publisher getStatusPublisher() throws AiravataException {
+ if (statusPublisher == null) {
+ synchronized (RabbitMQPublisher.class) {
+ if (statusPublisher == null) {
+ statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+ }
+ }
+ }
+ return statusPublisher;
}
@Override
@@ -145,10 +253,6 @@ public abstract class AiravataTask extends AbstractTask {
return experimentCatalog;
}
- public Publisher getStatusPublisher() {
- return statusPublisher;
- }
-
public String getProcessId() {
return processId;
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java
new file mode 100644
index 0000000..9ec2909
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java
@@ -0,0 +1,26 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+@TaskDef(name = "Completing Task")
+public class CompletingTask extends AiravataTask {
+
+ private static final Logger logger = LogManager.getLogger(CompletingTask.class);
+
+ @Override
+ public TaskResult onRun(TaskHelper helper) {
+ logger.info("Process " + getProcessId() + " successfully completed");
+ saveAndPublishProcessStatus(ProcessState.COMPLETED);
+ return onSuccess("Process " + getProcessId() + " successfully completed");
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index ddba5f2..abdc1bf 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.helix.task.TaskResult;
@@ -17,6 +18,8 @@ public class EnvSetupTask extends AiravataTask {
@Override
public TaskResult onRun(TaskHelper taskHelper) {
try {
+
+ saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
publishTaskState(TaskState.EXECUTING);
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
index 30eeec0..ed143dd 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
@@ -3,20 +3,17 @@ package org.apache.airavata.helix.impl.task;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.File;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -29,6 +26,8 @@ public class InputDataStagingTask extends DataStagingTask {
public TaskResult onRun(TaskHelper taskHelper) {
logger.info("Starting Input Data Staging Task " + getTaskId());
+ saveAndPublishProcessStatus(ProcessState.INPUT_DATA_STAGING);
+
try {
// Get and validate data staging task model
DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
index f33523c..ff8fd2e 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
@@ -7,6 +7,7 @@ import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.registry.cpi.ExpCatChildDataType;
import org.apache.airavata.registry.cpi.RegistryException;
@@ -29,6 +30,8 @@ public class OutputDataStagingTask extends DataStagingTask {
public TaskResult onRun(TaskHelper taskHelper) {
logger.info("Starting output data staging task " + getTaskId());
+ saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+
try {
// Get and validate data staging task model
DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 489a196..6be1d36 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -21,6 +21,8 @@ import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
@@ -385,6 +387,20 @@ public class TaskContext {
return null;
}
+ public TaskState getTaskState() {
+ if(getCurrentTaskModel().getTaskStatuses() != null)
+ return getCurrentTaskModel().getTaskStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public TaskStatus getTaskStatus() {
+ if(getCurrentTaskModel().getTaskStatuses() != null)
+ return getCurrentTaskModel().getTaskStatuses().get(0);
+ else
+ return null;
+ }
+
public String getComputeResourceId() {
if (isUseUserCRPref() &&
userComputeResourcePreference != null &&
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index 31b6f30..688f894 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -5,24 +5,18 @@ import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
-import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -38,6 +32,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
public TaskResult onRun(TaskHelper taskHelper) {
try {
+ saveAndPublishProcessStatus(ProcessState.EXECUTING);
GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
@@ -133,14 +128,14 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
if (verifyJobSubmissionByJobId(adaptor, jobId)) {
jobStatus.setJobState(JobState.QUEUED);
jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
createMonitoringNode(jobId);
}
@@ -172,7 +167,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
//taskStatus.setState(TaskState.COMPLETED);
//taskStatus.setReason("Submitted job to compute resource");
//taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
index 2e4a052..e3b5447 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -58,7 +58,7 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
return null;
} else {
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index afa2630..4fed22d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -73,6 +73,8 @@ public abstract class JobSubmissionTask extends AiravataTask {
this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]);
this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes());
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/task", getTaskId().getBytes());
+ this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes());
}
@@ -146,48 +148,11 @@ public abstract class JobSubmissionTask extends AiravataTask {
return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
}
- ////////////////////////////////
-
-
- /////////////////////////////////////////////
- public void saveExperimentError(ErrorModel errorModel) throws Exception {
- try {
- errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
- getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, getExperimentId());
- } catch (RegistryException e) {
- String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
- + " : - Error while updating experiment errors";
- throw new Exception(msg, e);
- }
- }
-
- public void saveProcessError(ErrorModel errorModel) throws Exception {
- try {
- errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
- getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
- } catch (RegistryException e) {
- String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
- + " : - Error while updating process errors";
- throw new Exception(msg, e);
- }
- }
-
- public void saveTaskError(ErrorModel errorModel) throws Exception {
- try {
- errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
- getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
- } catch (RegistryException e) {
- String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
- + " : - Error while updating task errors";
- throw new Exception(msg, e);
- }
- }
-
public void saveJobModel(JobModel jobModel) throws RegistryException {
getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId());
}
- public void saveJobStatus(JobModel jobModel) throws Exception {
+ public void saveAndPublishJobStatus(JobModel jobModel) throws Exception {
try {
// first we save job jobModel to the registry for sa and then save the job status.
JobStatus jobStatus = null;
@@ -213,7 +178,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- //getStatusPublisher().publish(msgCtx);
+ getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
index e3ae4fa..cea6750 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -58,7 +58,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
jobModel.setExitCode(submissionOutput.getExitCode());
jobModel.setStdErr(submissionOutput.getStdErr());
@@ -69,7 +69,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
- saveJobStatus(jobModel);
+ saveAndPublishJobStatus(jobModel);
return null;
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 383fe37..07a9aee 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -1,26 +1,33 @@
package org.apache.airavata.helix.impl.workflow;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.AiravataTask;
-import org.apache.airavata.helix.impl.task.EnvSetupTask;
-import org.apache.airavata.helix.impl.task.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.*;
import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
import org.apache.airavata.helix.workflow.WorkflowManager;
import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer;
import org.apache.airavata.job.monitor.parser.JobStatusResult;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.*;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -46,6 +53,7 @@ public class PostWorkflowManager {
private final String TOPIC = "parsed-data";
private CuratorFramework curatorClient = null;
+ private Publisher statusPublisher;
private void init() throws ApplicationSettingsException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -66,6 +74,18 @@ public class PostWorkflowManager {
return consumer;
}
+ private String getExperimentIdByJobId(String jobId) throws Exception {
+ byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment");
+ String process = new String(processBytes);
+ return process;
+ }
+
+ private String getTaskIdByJobId(String jobId) throws Exception {
+ byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task");
+ String process = new String(processBytes);
+ return process;
+ }
+
private String getProcessIdByJobId(String jobId) throws Exception {
byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process");
String process = new String(processBytes);
@@ -101,6 +121,8 @@ public class PostWorkflowManager {
if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
String gateway = getGatewayByJobId(jobStatusResult.getJobId());
String processId = getProcessIdByJobId(jobStatusResult.getJobId());
+ String experimentId = getExperimentIdByJobId(jobStatusResult.getJobId());
+ String task = getTaskIdByJobId(jobStatusResult.getJobId());
String status = getStatusByJobId(jobStatusResult.getJobId());
logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id "
@@ -111,6 +133,8 @@ public class PostWorkflowManager {
} else {
+ saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
+
if (jobStatusResult.getState() == JobState.COMPLETE) {
logger.info("Job " + jobStatusResult.getJobId() + " was completed");
@@ -151,6 +175,14 @@ public class PostWorkflowManager {
}
}
}
+
+ CompletingTask completingTask = new CompletingTask();
+ completingTask.setGatewayId(experimentModel.getGatewayId());
+ completingTask.setExperimentId(experimentModel.getExperimentId());
+ completingTask.setProcessId(processModel.getProcessId());
+ completingTask.setTaskId("Completing-Task");
+ allTasks.add(completingTask);
+
WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster",
"wm-23", ServerSettings.getZookeeperConnection());
@@ -189,6 +221,48 @@ public class PostWorkflowManager {
}
}
+ public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway,
+ JobState jobState) throws Exception {
+ try {
+
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setReason(jobState.name());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobStatus.setJobState(jobState);
+
+ if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) {
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ } else {
+ jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+ }
+
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, jobId);
+ ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway);
+ experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+ JobIdentifier identifier = new JobIdentifier(jobId, taskId,
+ processId, experimentId, gateway);
+
+ JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+ MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+ (MessageType.JOB.name()), gateway);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public Publisher getStatusPublisher() throws AiravataException {
+ if (statusPublisher == null) {
+ synchronized (RabbitMQPublisher.class) {
+ if (statusPublisher == null) {
+ statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+ }
+ }
+ }
+ return statusPublisher;
+ }
+
public static void main(String[] args) throws Exception {
PostWorkflowManager postManager = new PostWorkflowManager();
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
index 748a533..9f6d7b8 100644
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
+++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
@@ -32,5 +32,6 @@ public class MessageProducer {
public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
RecordMetadata recordMetadata = producer.send(record).get();
+ producer.flush();
}
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.