You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/03 18:50:13 UTC

[airavata-data-lake] branch master updated: Pre defining download path for parsing to rectify helix context issue

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d0867  Pre defining download path for parsing to rectify helix context issue
d8d0867 is described below

commit d8d08677749b9d3b425c4d74e189ba792d88ac65
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Sep 3 14:49:56 2021 -0400

    Pre defining download path for parsing to rectify helix context issue
---
 .../wm/datasync/DataParsingWorkflowManager.java       |  8 ++++++--
 .../engine/task/impl/MetadataPersistTask.java         |  1 +
 .../engine/task/impl/SyncLocalDataDownloadTask.java   | 19 ++++++++++++++-----
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
index b568e75..1e547e4 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -122,6 +122,8 @@ public class DataParsingWorkflowManager {
 
             ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
 
+            String tempDownloadPath = "/tmp/" + UUID.randomUUID().toString();
+
             Map<String, StringMap> parserInputMappings = new HashMap<>();
             List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
                 List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
@@ -137,7 +139,7 @@ public class DataParsingWorkflowManager {
                     bindings.put("metadata", metadata);
                     try {
                         Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
-                        stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+                        stringMap.put(pji.getDataParserInputInterfaceId(), tempDownloadPath);
                         match = match && eval;
                     } catch (ScriptException e) {
                         logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
@@ -169,6 +171,7 @@ public class DataParsingWorkflowManager {
             downloadTask.setMftPort(mftPort);
             downloadTask.setSourceResourceId(sourceResourceId);
             downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+            downloadTask.setDownloadPath(tempDownloadPath);
 
             taskMap.put(downloadTask.getTaskId(), downloadTask);
 
@@ -205,7 +208,8 @@ public class DataParsingWorkflowManager {
                         mpt.setServiceAccountKey(mftClientId);
                         mpt.setServiceAccountSecret(mftClientSecret);
                         mpt.setResourceId(sourceResourceId);
-                        mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+                        mpt.setJsonFile("$" + dataParsingTask.getTaskId() +
+                                "-" + dataParserOutputInterface.getOutputName());
                         OutPort dpOut = new OutPort();
                         dpOut.setNextTaskId(mpt.getTaskId());
                         dataParsingTask.addOutPort(dpOut);
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
index 3abc375..f2f3756 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
@@ -72,6 +72,7 @@ public class MetadataPersistTask extends BlockingTask {
 
         String derivedFilePath = getJsonFile();
         if (derivedFilePath.startsWith("$")) {
+            logger.info("Fetching json file path from cotext for key {}", derivedFilePath);
             derivedFilePath = getUserContent(derivedFilePath.substring(1), Scope.WORKFLOW);
         }
 
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
index b8959c1..01bbd26 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
@@ -63,6 +63,9 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
     @TaskParam(name = "SourceCredToken")
     private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
 
+    @TaskParam(name = "DownloadPath")
+    private final ThreadLocal<String> downloadPath = new ThreadLocal<>();
+
     public static void main(String args[]) {
 
 
@@ -117,11 +120,10 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
         }
 
         String downloadUrl = httpDownloadApiResponse.getUrl();
-        logger.info("Using download URL {}", downloadUrl);
+        logger.info("Using download URL {} to download file {}", downloadUrl, metadata.getFriendlyName());
 
-        String downloadPath = "/tmp/" + metadata.getFriendlyName();
         try (BufferedInputStream in = new BufferedInputStream(new URL(downloadUrl).openStream());
-             FileOutputStream fileOutputStream = new FileOutputStream(downloadPath)) {
+             FileOutputStream fileOutputStream = new FileOutputStream(getDownloadPath())) {
                 byte dataBuffer[] = new byte[1024];
                 int bytesRead;
                 while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
@@ -132,9 +134,8 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
             return new TaskResult(TaskResult.Status.FAILED, "Failed to download file");
         }
 
-        logger.info("Downloaded to path {}", downloadPath);
+        logger.info("Downloaded filr {} to path {}", metadata.getFriendlyName(), getDownloadPath());
 
-        putUserContent("DOWNLOAD_PATH", downloadPath, Scope.WORKFLOW);
         return new TaskResult(TaskResult.Status.COMPLETED, "Success");
     }
 
@@ -201,4 +202,12 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
     public void setTenantId(String tenantId) {
         this.tenantId.set(tenantId);
     }
+
+    public String getDownloadPath() {
+        return downloadPath.get();
+    }
+
+    public void setDownloadPath(String downloadPath) {
+        this.downloadPath.set(downloadPath);
+    }
 }