You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/03 18:50:13 UTC
[airavata-data-lake] branch master updated: Pre defining download
path for parsing to rectify helix context issue
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new d8d0867 Pre defining download path for parsing to rectify helix context issue
d8d0867 is described below
commit d8d08677749b9d3b425c4d74e189ba792d88ac65
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Sep 3 14:49:56 2021 -0400
Pre defining download path for parsing to rectify helix context issue
---
.../wm/datasync/DataParsingWorkflowManager.java | 8 ++++++--
.../engine/task/impl/MetadataPersistTask.java | 1 +
.../engine/task/impl/SyncLocalDataDownloadTask.java | 19 ++++++++++++++-----
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
index b568e75..1e547e4 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -122,6 +122,8 @@ public class DataParsingWorkflowManager {
ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+ String tempDownloadPath = "/tmp/" + UUID.randomUUID().toString();
+
Map<String, StringMap> parserInputMappings = new HashMap<>();
List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
@@ -137,7 +139,7 @@ public class DataParsingWorkflowManager {
bindings.put("metadata", metadata);
try {
Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
- stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+ stringMap.put(pji.getDataParserInputInterfaceId(), tempDownloadPath);
match = match && eval;
} catch (ScriptException e) {
logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
@@ -169,6 +171,7 @@ public class DataParsingWorkflowManager {
downloadTask.setMftPort(mftPort);
downloadTask.setSourceResourceId(sourceResourceId);
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+ downloadTask.setDownloadPath(tempDownloadPath);
taskMap.put(downloadTask.getTaskId(), downloadTask);
@@ -205,7 +208,8 @@ public class DataParsingWorkflowManager {
mpt.setServiceAccountKey(mftClientId);
mpt.setServiceAccountSecret(mftClientSecret);
mpt.setResourceId(sourceResourceId);
- mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+ mpt.setJsonFile("$" + dataParsingTask.getTaskId() +
+ "-" + dataParserOutputInterface.getOutputName());
OutPort dpOut = new OutPort();
dpOut.setNextTaskId(mpt.getTaskId());
dataParsingTask.addOutPort(dpOut);
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
index 3abc375..f2f3756 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
@@ -72,6 +72,7 @@ public class MetadataPersistTask extends BlockingTask {
String derivedFilePath = getJsonFile();
if (derivedFilePath.startsWith("$")) {
+ logger.info("Fetching json file path from cotext for key {}", derivedFilePath);
derivedFilePath = getUserContent(derivedFilePath.substring(1), Scope.WORKFLOW);
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
index b8959c1..01bbd26 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
@@ -63,6 +63,9 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
@TaskParam(name = "SourceCredToken")
private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
+ @TaskParam(name = "DownloadPath")
+ private final ThreadLocal<String> downloadPath = new ThreadLocal<>();
+
public static void main(String args[]) {
@@ -117,11 +120,10 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
}
String downloadUrl = httpDownloadApiResponse.getUrl();
- logger.info("Using download URL {}", downloadUrl);
+ logger.info("Using download URL {} to download file {}", downloadUrl, metadata.getFriendlyName());
- String downloadPath = "/tmp/" + metadata.getFriendlyName();
try (BufferedInputStream in = new BufferedInputStream(new URL(downloadUrl).openStream());
- FileOutputStream fileOutputStream = new FileOutputStream(downloadPath)) {
+ FileOutputStream fileOutputStream = new FileOutputStream(getDownloadPath())) {
byte dataBuffer[] = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
@@ -132,9 +134,8 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
return new TaskResult(TaskResult.Status.FAILED, "Failed to download file");
}
- logger.info("Downloaded to path {}", downloadPath);
+ logger.info("Downloaded filr {} to path {}", metadata.getFriendlyName(), getDownloadPath());
- putUserContent("DOWNLOAD_PATH", downloadPath, Scope.WORKFLOW);
return new TaskResult(TaskResult.Status.COMPLETED, "Success");
}
@@ -201,4 +202,12 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
public void setTenantId(String tenantId) {
this.tenantId.set(tenantId);
}
+
+ public String getDownloadPath() {
+ return downloadPath.get();
+ }
+
+ public void setDownloadPath(String downloadPath) {
+ this.downloadPath.set(downloadPath);
+ }
}