You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/08 22:11:36 UTC

[airavata] branch helix-integration updated: Adding output file details to experiment output

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/helix-integration by this push:
     new bc0016f  Adding output file details to experiment output
bc0016f is described below

commit bc0016f65dfb0146c92bbd76cc25cb93650748ea
Author: dimuthu <di...@gmail.com>
AuthorDate: Thu Mar 8 17:11:29 2018 -0500

    Adding output file details to experiment output
---
 .../airavata/helix/impl/task/AiravataTask.java     | 74 +++++++++++++++++++---
 .../impl/task/staging/OutputDataStagingTask.java   |  9 ++-
 .../helix/impl/workflow/PostWorkflowManager.java   |  1 +
 .../apache/airavata/helix/core/util/TaskUtil.java  |  9 ++-
 4 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 3ad8632..183b4e7 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -32,7 +32,10 @@ import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.*;
@@ -46,6 +49,7 @@ import org.apache.log4j.MDC;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.List;
 
 public abstract class AiravataTask extends AbstractTask {
 
@@ -68,11 +72,16 @@ public abstract class AiravataTask extends AbstractTask {
     @TaskParam(name = "gatewayId")
     private String gatewayId;
 
+    @TaskParam(name = "Skip Status Publish")
+    private boolean skipTaskStatusPublish = false;
+
     @TaskOutPort(name = "Next Task")
     private OutPort nextTask;
 
     protected TaskResult onSuccess(String message) {
-        publishTaskState(TaskState.COMPLETED);
+        if (!skipTaskStatusPublish) {
+            publishTaskState(TaskState.COMPLETED);
+        }
         String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
         logger.info(successMessage);
         return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
@@ -104,11 +113,13 @@ public abstract class AiravataTask extends AbstractTask {
         errorModel.setActualErrorMessage(errors.toString());
         errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
 
-        publishTaskState(TaskState.FAILED);
-        saveAndPublishProcessStatus();
-        saveExperimentError(errorModel);
-        saveProcessError(errorModel);
-        saveTaskError(errorModel);
+        if (!skipTaskStatusPublish) {
+            publishTaskState(TaskState.FAILED);
+            saveAndPublishProcessStatus();
+            saveExperimentError(errorModel);
+            saveProcessError(errorModel);
+            saveTaskError(errorModel);
+        }
         return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
     }
 
@@ -164,6 +175,41 @@ public abstract class AiravataTask extends AbstractTask {
         }
     }
 
+    public void saveExperimentOutput(String outputName, String outputVal) throws TaskOnFailException {
+        try {
+            ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+            if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+                for (OutputDataObjectType expOutput : experimentOutputs){
+                    if (expOutput.getName().equals(outputName)){
+                        DataProductModel dataProductModel = new DataProductModel();
+                        dataProductModel.setGatewayId(getGatewayId());
+                        dataProductModel.setOwnerName(getProcessModel().getUserName());
+                        dataProductModel.setProductName(outputName);
+                        dataProductModel.setDataProductType(DataProductType.FILE);
+
+                        DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+                        replicaLocationModel.setStorageResourceId(getTaskContext().getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+                        replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+                        replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+                        replicaLocationModel.setFilePath(outputVal);
+                        dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+                        ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+                        String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+                        expOutput.setValue(productUri);
+                    }
+                }
+            }
+            experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+
+        } catch (RegistryException | AppCatalogException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment outputs";
+            throw new TaskOnFailException(msg, true, e);
+        }
+    }
+
     @SuppressWarnings("WeakerAccess")
     protected void saveExperimentError(ErrorModel errorModel) {
         try {
@@ -218,7 +264,9 @@ public abstract class AiravataTask extends AbstractTask {
             MDC.put("process", getProcessId());
             MDC.put("gateway", getGatewayId());
             MDC.put("task", getTaskId());
-            publishTaskState(TaskState.EXECUTING);
+            if (!skipTaskStatusPublish) {
+                publishTaskState(TaskState.EXECUTING);
+            }
             return onRun(helper, getTaskContext());
         } finally {
             MDC.clear();
@@ -234,7 +282,9 @@ public abstract class AiravataTask extends AbstractTask {
             MDC.put("process", getProcessId());
             MDC.put("gateway", getGatewayId());
             MDC.put("task", getTaskId());
-            publishTaskState(TaskState.CANCELED);
+            if (!skipTaskStatusPublish) {
+                publishTaskState(TaskState.CANCELED);
+            }
             onCancel(getTaskContext());
         } finally {
             MDC.clear();
@@ -351,4 +401,12 @@ public abstract class AiravataTask extends AbstractTask {
     public void setNextTask(OutPort nextTask) {
         this.nextTask = nextTask;
     }
+
+    public void setSkipTaskStatusPublish(boolean skipTaskStatusPublish) {
+        this.skipTaskStatusPublish = skipTaskStatusPublish;
+    }
+
+    public boolean isSkipTaskStatusPublish() {
+        return skipTaskStatusPublish;
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 3f4fe89..2eddc47 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -166,13 +166,14 @@ public class OutputDataStagingTask extends DataStagingTask {
                     }
 
                     logger.info("Transferring file " + sourceFileName);
-                    transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
+                    transferFile(processOutput.getName(), sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
                 }
                 return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
 
             } else {
                 // Downloading input file from the storage resource
-                transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
+                assert processOutput != null;
+                transferFile(processOutput.getName(), sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
                 return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
             }
 
@@ -190,7 +191,7 @@ public class OutputDataStagingTask extends DataStagingTask {
         }
     }
 
-    private void transferFile(URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor,
+    private void transferFile(String outputName, URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor,
                               StorageResourceAdaptor storageResourceAdaptor) throws TaskOnFailException {
         String localSourceFilePath = getLocalDataPath(fileName);
 
@@ -212,6 +213,8 @@ public class OutputDataStagingTask extends DataStagingTask {
             throw new TaskOnFailException("Failed uploading the output file to " + destinationURI.getPath() + " from local path " +
                     localSourceFilePath, true, e);
         }
+
+        saveExperimentOutput(outputName, destinationURI.toString());
     }
 
     @Override
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index c8ef45c..77e753c 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -189,6 +189,7 @@ public class PostWorkflowManager {
                         completingTask.setExperimentId(experimentModel.getExperimentId());
                         completingTask.setProcessId(processModel.getProcessId());
                         completingTask.setTaskId("Completing-Task");
+                        completingTask.setSkipTaskStatusPublish(true);
                         if (allTasks.size() > 0) {
                             allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(), completingTask));
                         }
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index a006c58..f58b365 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -99,11 +99,14 @@ public class TaskUtil {
                     classField.setAccessible(true);
                     if (classField.getType().isAssignableFrom(String.class)) {
                         classField.set(instance, params.get(param.name()));
-                    } else if (classField.getType().isAssignableFrom(Integer.class)) {
+                    } else if (classField.getType().isAssignableFrom(Integer.class) ||
+                            classField.getType().isAssignableFrom(Integer.TYPE)) {
                         classField.set(instance, Integer.parseInt(params.get(param.name())));
-                    } else if (classField.getType().isAssignableFrom(Long.class)) {
+                    } else if (classField.getType().isAssignableFrom(Long.class) ||
+                            classField.getType().isAssignableFrom(Long.TYPE)) {
                         classField.set(instance, Long.parseLong(params.get(param.name())));
-                    } else if (classField.getType().isAssignableFrom(Boolean.class)) {
+                    } else if (classField.getType().isAssignableFrom(Boolean.class) ||
+                            classField.getType().isAssignableFrom(Boolean.TYPE)) {
                         classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
                     }
                 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.