You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/08 22:11:36 UTC
[airavata] branch helix-integration updated: Adding output file
details to experiment output
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/helix-integration by this push:
new bc0016f Adding output file details to experiment output
bc0016f is described below
commit bc0016f65dfb0146c92bbd76cc25cb93650748ea
Author: dimuthu <di...@gmail.com>
AuthorDate: Thu Mar 8 17:11:29 2018 -0500
Adding output file details to experiment output
---
.../airavata/helix/impl/task/AiravataTask.java | 74 +++++++++++++++++++---
.../impl/task/staging/OutputDataStagingTask.java | 9 ++-
.../helix/impl/workflow/PostWorkflowManager.java | 1 +
.../apache/airavata/helix/core/util/TaskUtil.java | 9 ++-
4 files changed, 79 insertions(+), 14 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 3ad8632..183b4e7 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -32,7 +32,10 @@ import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.*;
@@ -46,6 +49,7 @@ import org.apache.log4j.MDC;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.List;
public abstract class AiravataTask extends AbstractTask {
@@ -68,11 +72,16 @@ public abstract class AiravataTask extends AbstractTask {
@TaskParam(name = "gatewayId")
private String gatewayId;
+ @TaskParam(name = "Skip Status Publish")
+ private boolean skipTaskStatusPublish = false;
+
@TaskOutPort(name = "Next Task")
private OutPort nextTask;
protected TaskResult onSuccess(String message) {
- publishTaskState(TaskState.COMPLETED);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.COMPLETED);
+ }
String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
logger.info(successMessage);
return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
@@ -104,11 +113,13 @@ public abstract class AiravataTask extends AbstractTask {
errorModel.setActualErrorMessage(errors.toString());
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- publishTaskState(TaskState.FAILED);
- saveAndPublishProcessStatus();
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.FAILED);
+ saveAndPublishProcessStatus();
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ }
return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
}
@@ -164,6 +175,41 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ public void saveExperimentOutput(String outputName, String outputVal) throws TaskOnFailException {
+ try {
+ ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+ for (OutputDataObjectType expOutput : experimentOutputs){
+ if (expOutput.getName().equals(outputName)){
+ DataProductModel dataProductModel = new DataProductModel();
+ dataProductModel.setGatewayId(getGatewayId());
+ dataProductModel.setOwnerName(getProcessModel().getUserName());
+ dataProductModel.setProductName(outputName);
+ dataProductModel.setDataProductType(DataProductType.FILE);
+
+ DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+ replicaLocationModel.setStorageResourceId(getTaskContext().getStorageResource().getStorageResourceId());
+ replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+ replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+ replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+ replicaLocationModel.setFilePath(outputVal);
+ dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+ ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+ String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+ expOutput.setValue(productUri);
+ }
+ }
+ }
+ experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+
+ } catch (RegistryException | AppCatalogException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment outputs";
+ throw new TaskOnFailException(msg, true, e);
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
protected void saveExperimentError(ErrorModel errorModel) {
try {
@@ -218,7 +264,9 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
- publishTaskState(TaskState.EXECUTING);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.EXECUTING);
+ }
return onRun(helper, getTaskContext());
} finally {
MDC.clear();
@@ -234,7 +282,9 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
- publishTaskState(TaskState.CANCELED);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.CANCELED);
+ }
onCancel(getTaskContext());
} finally {
MDC.clear();
@@ -351,4 +401,12 @@ public abstract class AiravataTask extends AbstractTask {
public void setNextTask(OutPort nextTask) {
this.nextTask = nextTask;
}
+
+ public void setSkipTaskStatusPublish(boolean skipTaskStatusPublish) {
+ this.skipTaskStatusPublish = skipTaskStatusPublish;
+ }
+
+ public boolean isSkipTaskStatusPublish() {
+ return skipTaskStatusPublish;
+ }
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 3f4fe89..2eddc47 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -166,13 +166,14 @@ public class OutputDataStagingTask extends DataStagingTask {
}
logger.info("Transferring file " + sourceFileName);
- transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
+ transferFile(processOutput.getName(), sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
}
return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
} else {
// Downloading input file from the storage resource
- transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
+ assert processOutput != null;
+ transferFile(processOutput.getName(), sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
}
@@ -190,7 +191,7 @@ public class OutputDataStagingTask extends DataStagingTask {
}
}
- private void transferFile(URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor,
+ private void transferFile(String outputName, URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor,
StorageResourceAdaptor storageResourceAdaptor) throws TaskOnFailException {
String localSourceFilePath = getLocalDataPath(fileName);
@@ -212,6 +213,8 @@ public class OutputDataStagingTask extends DataStagingTask {
throw new TaskOnFailException("Failed uploading the output file to " + destinationURI.getPath() + " from local path " +
localSourceFilePath, true, e);
}
+
+ saveExperimentOutput(outputName, destinationURI.toString());
}
@Override
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index c8ef45c..77e753c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -189,6 +189,7 @@ public class PostWorkflowManager {
completingTask.setExperimentId(experimentModel.getExperimentId());
completingTask.setProcessId(processModel.getProcessId());
completingTask.setTaskId("Completing-Task");
+ completingTask.setSkipTaskStatusPublish(true);
if (allTasks.size() > 0) {
allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index a006c58..f58b365 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -99,11 +99,14 @@ public class TaskUtil {
classField.setAccessible(true);
if (classField.getType().isAssignableFrom(String.class)) {
classField.set(instance, params.get(param.name()));
- } else if (classField.getType().isAssignableFrom(Integer.class)) {
+ } else if (classField.getType().isAssignableFrom(Integer.class) ||
+ classField.getType().isAssignableFrom(Integer.TYPE)) {
classField.set(instance, Integer.parseInt(params.get(param.name())));
- } else if (classField.getType().isAssignableFrom(Long.class)) {
+ } else if (classField.getType().isAssignableFrom(Long.class) ||
+ classField.getType().isAssignableFrom(Long.TYPE)) {
classField.set(instance, Long.parseLong(params.get(param.name())));
- } else if (classField.getType().isAssignableFrom(Boolean.class)) {
+ } else if (classField.getType().isAssignableFrom(Boolean.class) ||
+ classField.getType().isAssignableFrom(Boolean.TYPE)) {
classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
}
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.