You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/06/03 16:36:09 UTC
[airavata] branch master updated: Providing support for URI
collections in Input Data Staging
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new f8529a9 Providing support for URI collections in Input Data Staging
f8529a9 is described below
commit f8529a92d854f9b14bd25e65e6b6789a8512af48
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Jun 3 12:35:58 2019 -0400
Providing support for URI collections in Input Data Staging
---
.../impl/task/staging/InputDataStagingTask.java | 115 ++++++++++++---------
1 file changed, 64 insertions(+), 51 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 298925d..60120aa 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -26,6 +26,7 @@ import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
@@ -67,65 +68,29 @@ public class InputDataStagingTask extends DataStagingTask {
throw new TaskOnFailException(message, true, null);
}
- // Fetch and validate source and destination URLS
- URI sourceURI;
- URI destinationURI;
- String sourceFileName;
try {
- sourceURI = new URI(dataStagingTaskModel.getSource());
- destinationURI = new URI(dataStagingTaskModel.getDestination());
- if (logger.isDebugEnabled()) {
- logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
- }
-
- sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
- sourceURI.getPath().length());
- } catch (URISyntaxException e) {
- throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e);
- }
+ String sourceUrls[];
- // Fetch and validate storage adaptor
- StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
-
- // Fetch and validate compute resource adaptor
- AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
-
- String localSourceFilePath = getLocalDataPath(sourceFileName);
- // Downloading input file from the storage resource
-
- try {
- try {
- logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
- storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
- logger.info("Input file downloaded to " + localSourceFilePath);
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
- }
-
- File localFile = new File(localSourceFilePath);
- if (localFile.exists()) {
- if (localFile.length() == 0) {
- logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
- return onFail("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
- }
+ if (dataStagingTaskModel.getProcessInput().getType() == DataType.URI_COLLECTION) {
+ logger.info("Found a URI collection so splitting by comma for path " + dataStagingTaskModel.getSource());
+ sourceUrls = dataStagingTaskModel.getSource().split(",");
} else {
- throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
+ sourceUrls = new String[]{dataStagingTaskModel.getSource()};
}
- // Uploading input file to the compute resource
- try {
- logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
- adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
- logger.info("Input file uploaded to " + destinationURI.getPath());
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
+ for (String url : sourceUrls) {
+ URI sourceURI = new URI(url);
+ URI destinationURI = new URI(dataStagingTaskModel.getDestination());
+
+ logger.info("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
+ copySingleFile(sourceURI, destinationURI, taskHelper);
}
- } finally {
- logger.info("Deleting temporary file " + localSourceFilePath);
- deleteTempFile(localSourceFilePath);
+ } catch (URISyntaxException e) {
+ throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e);
}
+
return onSuccess("Input data staging task " + getTaskId() + " successfully completed");
} catch (TaskOnFailException e) {
@@ -136,12 +101,60 @@ public class InputDataStagingTask extends DataStagingTask {
}
return onFail(e.getReason(), e.isCritical(), e.getError());
- }catch (Exception e) {
+ } catch (Exception e) {
logger.error("Unknown error while executing input data staging task " + getTaskId(), e);
return onFail("Unknown error while executing input data staging task " + getTaskId(), false, e);
}
}
+ private void copySingleFile(URI sourceURI, URI destinationURI, TaskHelper taskHelper) throws TaskOnFailException {
+
+ String sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+
+ // Fetch and validate storage adaptor
+ StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+
+ // Fetch and validate compute resource adaptor
+ AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+ String localSourceFilePath = getLocalDataPath(sourceFileName);
+ // Downloading input file from the storage resource
+
+ try {
+ try {
+ logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
+ storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
+ logger.info("Input file downloaded to " + localSourceFilePath);
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
+ }
+
+ File localFile = new File(localSourceFilePath);
+ if (localFile.exists()) {
+ if (localFile.length() == 0) {
+ logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
+ throw new TaskOnFailException("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
+ }
+ } else {
+ throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
+ }
+
+ // Uploading input file to the compute resource
+ try {
+ logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
+ adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
+ logger.info("Input file uploaded to " + destinationURI.getPath());
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
+ }
+
+ } finally {
+ logger.info("Deleting temporary file " + localSourceFilePath);
+ deleteTempFile(localSourceFilePath);
+ }
+ }
+
@Override
public void onCancel(TaskContext taskContext) {