You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/06/03 16:36:09 UTC

[airavata] branch master updated: Providing support for URI collections in Input Data Staging

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new f8529a9  Providing support for URI collections in Input Data Staging
f8529a9 is described below

commit f8529a92d854f9b14bd25e65e6b6789a8512af48
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Jun 3 12:35:58 2019 -0400

    Providing support for URI collections in Input Data Staging
---
 .../impl/task/staging/InputDataStagingTask.java    | 115 ++++++++++++---------
 1 file changed, 64 insertions(+), 51 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 298925d..60120aa 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -26,6 +26,7 @@ import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
@@ -67,65 +68,29 @@ public class InputDataStagingTask extends DataStagingTask {
                 throw new TaskOnFailException(message, true, null);
             }
 
-            // Fetch and validate source and destination URLS
-            URI sourceURI;
-            URI destinationURI;
-            String sourceFileName;
             try {
-                sourceURI = new URI(dataStagingTaskModel.getSource());
-                destinationURI = new URI(dataStagingTaskModel.getDestination());
 
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
-                }
-
-                sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
-                        sourceURI.getPath().length());
-            } catch (URISyntaxException e) {
-                throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e);
-            }
+                String sourceUrls[];
 
-            // Fetch and validate storage adaptor
-            StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
-
-            // Fetch and validate compute resource adaptor
-            AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
-
-            String localSourceFilePath = getLocalDataPath(sourceFileName);
-            // Downloading input file from the storage resource
-
-            try {
-                try {
-                    logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
-                    storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
-                    logger.info("Input file downloaded to " + localSourceFilePath);
-                } catch (AgentException e) {
-                    throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
-                }
-
-                File localFile = new File(localSourceFilePath);
-                if (localFile.exists()) {
-                    if (localFile.length() == 0) {
-                        logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
-                        return onFail("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
-                    }
+                if (dataStagingTaskModel.getProcessInput().getType() == DataType.URI_COLLECTION) {
+                    logger.info("Found a URI collection so splitting by comma for path " + dataStagingTaskModel.getSource());
+                    sourceUrls = dataStagingTaskModel.getSource().split(",");
                 } else {
-                    throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
+                    sourceUrls = new String[]{dataStagingTaskModel.getSource()};
                 }
 
-                // Uploading input file to the compute resource
-                try {
-                    logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
-                    adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
-                    logger.info("Input file uploaded to " + destinationURI.getPath());
-                } catch (AgentException e) {
-                    throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
+                for (String url : sourceUrls) {
+                    URI sourceURI = new URI(url);
+                    URI destinationURI = new URI(dataStagingTaskModel.getDestination());
+
+                    logger.info("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
+                    copySingleFile(sourceURI, destinationURI, taskHelper);
                 }
 
-            } finally {
-                logger.info("Deleting temporary file " + localSourceFilePath);
-                deleteTempFile(localSourceFilePath);
+            } catch (URISyntaxException e) {
+                throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e);
             }
+
             return onSuccess("Input data staging task " + getTaskId() + " successfully completed");
 
         } catch (TaskOnFailException e) {
@@ -136,12 +101,60 @@ public class InputDataStagingTask extends DataStagingTask {
             }
             return onFail(e.getReason(), e.isCritical(), e.getError());
 
-        }catch (Exception e) {
+        } catch (Exception e) {
             logger.error("Unknown error while executing input data staging task " + getTaskId(), e);
             return onFail("Unknown error while executing input data staging task " + getTaskId(), false,  e);
         }
     }
 
+    private void copySingleFile(URI sourceURI, URI destinationURI, TaskHelper taskHelper) throws TaskOnFailException {
+
+        String sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+
+        // Fetch and validate storage adaptor
+        StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+
+        // Fetch and validate compute resource adaptor
+        AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+        String localSourceFilePath = getLocalDataPath(sourceFileName);
+        // Downloading input file from the storage resource
+
+        try {
+            try {
+                logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
+                storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
+                logger.info("Input file downloaded to " + localSourceFilePath);
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
+            }
+
+            File localFile = new File(localSourceFilePath);
+            if (localFile.exists()) {
+                if (localFile.length() == 0) {
+                    logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
+                    throw new TaskOnFailException("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
+                }
+            } else {
+                throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
+            }
+
+            // Uploading input file to the compute resource
+            try {
+                logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
+                adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
+                logger.info("Input file uploaded to " + destinationURI.getPath());
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
+            }
+
+        } finally {
+            logger.info("Deleting temporary file " + localSourceFilePath);
+            deleteTempFile(localSourceFilePath);
+        }
+    }
+
     @Override
     public void onCancel(TaskContext taskContext) {