You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/03 04:47:32 UTC
[airavata-data-lake] branch master updated: Handling callback path
of data transfer task
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 4e69843 Handling callback path of data transfer task
4e69843 is described below
commit 4e69843ffb5059d0222a689a5269099a27d136c3
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jun 3 00:47:19 2021 -0400
Handling callback path of data transfer task
---
.../services/handler/WorkflowEngineAPIHandler.java | 3 -
.../engine/services/wm/CallbackWorkflowEntity.java | 9 ++
.../engine/services/wm/WorkflowOperator.java | 50 +++-----
.../wm/datasync/DataSyncWorkflowManager.java | 44 ++++---
.../workflow/engine/task/AbstractTask.java | 56 +--------
.../task/{AbstractTask.java => TaskUtil.java} | 137 ++++++---------------
.../engine/task/impl/AsyncDataTransferTask.java | 14 ++-
7 files changed, 99 insertions(+), 214 deletions(-)
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
index c4e44c1..dbbce74 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
@@ -40,9 +40,6 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
private MFTCallbackStore mftCallbackStore;
@Autowired
- private CallbackWorkflowStore callbackWorkflowStore;
-
- @Autowired
private DataSyncWorkflowManager dataSyncWorkflowManager;
@Override
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
index 7a14573..e254b9c 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
@@ -23,6 +23,7 @@ import java.util.Map;
public class CallbackWorkflowEntity {
private int prevSectionIndex;
+ private Map<String, Map<String, String>> taskValueMap;
private Map<String, AbstractTask> taskMap;
private String workflowId;
private String startTaskId;
@@ -35,6 +36,14 @@ public class CallbackWorkflowEntity {
this.prevSectionIndex = prevSectionIndex;
}
+ public Map<String, Map<String, String>> getTaskValueMap() {
+ return taskValueMap;
+ }
+
+ public void setTaskValueMap(Map<String, Map<String, String>> taskValueMap) {
+ this.taskValueMap = taskValueMap;
+ }
+
public Map<String, AbstractTask> getTaskMap() {
return taskMap;
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 4eee6f1..7571057 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -21,6 +21,7 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTa
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
@@ -102,11 +103,21 @@ public class WorkflowOperator {
}
private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String workflowName,
- String nonBlockingTaskId, int currentSection) {
+ String nonBlockingTaskId, int currentSection) throws Exception {
+
+ NonBlockingTask nbTask = (NonBlockingTask) taskMap.get(nonBlockingTaskId);
+ nbTask.setCurrentSection(currentSection + 1);
+
+ Map<String, Map<String, String>> serializedMap = new HashMap<>();
+ for (String key : taskMap.keySet()) {
+ Map<String, String> stringMap = TaskUtil.serializeTaskData(taskMap.get(key));
+ serializedMap.put(key, stringMap);
+ }
CallbackWorkflowEntity cwe = new CallbackWorkflowEntity();
cwe.setWorkflowId(workflowName);
cwe.setPrevSectionIndex(currentSection);
+ cwe.setTaskValueMap(serializedMap);
cwe.setTaskMap(taskMap);
cwe.setStartTaskId(nonBlockingTaskId);
this.cbws.saveWorkflowEntity(cwe);
@@ -131,7 +142,7 @@ public class WorkflowOperator {
.setTaskId(currentTask.getTaskId())
.setCommand(taskName);
- Map<String, String> paramMap = serializeTaskData(currentTask);
+ Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
paramMap.forEach(taskBuilder::addConfig);
List<TaskConfig> taskBuilds = new ArrayList<>();
@@ -165,7 +176,7 @@ public class WorkflowOperator {
.setTaskId(currentTask.getTaskId())
.setCommand(taskName);
- Map<String, String> paramMap = serializeTaskData(currentTask);
+ Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
paramMap.forEach(taskBuilder::addConfig);
List<TaskConfig> taskBuilds = new ArrayList<>();
@@ -206,39 +217,6 @@ public class WorkflowOperator {
taskDriver.delete(workflowName);
}
- private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
- Map<String, String> result = new HashMap<>();
- for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
- Field[] fields = c.getDeclaredFields();
- for (Field classField : fields) {
- TaskParam parm = classField.getAnnotation(TaskParam.class);
- try {
- if (parm != null) {
- Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
- if (propertyValue instanceof TaskParamType) {
- result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
- } else {
- result.put(parm.name(), propertyValue.toString());
- }
- }
- } catch (Exception e) {
- logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
- throw e;
- }
-
- TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
- if (outPort != null) {
- classField.setAccessible(true);
- if (classField.get(data) != null) {
- result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
- }
- }
- }
- }
- return result;
- }
-
private <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
List<OutPort> outPorts = new ArrayList<>();
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
index 53350c4..f723ead 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
@@ -19,29 +19,18 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.da
import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEvent;
import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEventDeserializer;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowEntity;
import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.WorkflowOperator;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.WebApplicationType;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
import java.time.Duration;
import java.util.*;
@@ -78,7 +67,7 @@ public class DataSyncWorkflowManager {
@Autowired
private CallbackWorkflowStore callbackWorkflowStore;
- private ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
+ private final ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
private WorkflowOperator workflowOperator;
@@ -97,9 +86,32 @@ public class DataSyncWorkflowManager {
return consumer;
}
- private boolean processKakfaMessage(DataTransferEvent dte) {
+ private boolean processCallbackMessage(DataTransferEvent dte) {
logger.info("Processing DTE for task {}, workflow {} and status {}",
dte.getTaskId(), dte.getWorkflowId(), dte.getTransferStatus());
+ Optional<CallbackWorkflowEntity> workflowEntityOp = callbackWorkflowStore.getWorkflowEntity(dte.getWorkflowId(), dte.getTaskId(), 1);
+ if (workflowEntityOp.isPresent()) {
+ logger.info("Found a callback workflow to continue workflow {}", dte.getWorkflowId());
+ CallbackWorkflowEntity callbackWorkflowEntity = workflowEntityOp.get();
+ String[] startTasks = {callbackWorkflowEntity.getStartTaskId()};
+
+ try {
+ Map<String, AbstractTask> taskMap = callbackWorkflowEntity.getTaskMap();
+ Map<String, Map<String, String>> taskValueMap = callbackWorkflowEntity.getTaskValueMap();
+
+ // Initialize task data
+ for (String key : taskMap.keySet()) {
+ TaskUtil.deserializeTaskData(taskMap.get(key), taskValueMap.get(key));
+ }
+
+ String workflowId = this.workflowOperator.buildAndRunWorkflow(taskMap, startTasks);
+ logger.info("Successfully submitted callback workflow {} for incoming workflow {}", workflowId, dte.getWorkflowId());
+ } catch (Exception e) {
+ logger.error("Failed in executing callback workflow for worrkflow {}", dte.getWorkflowId());
+ }
+ } else {
+ logger.warn("Didn't find a callback workflow for workflow {}", dte.getWorkflowId());
+ }
return true;
}
@@ -120,7 +132,7 @@ public class DataSyncWorkflowManager {
List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
processingFutures.add(executorCompletionService.submit(() -> {
- boolean success = processKakfaMessage(record.value());
+ boolean success = processCallbackMessage(record.value());
logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
return success;
}));
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 89863fc..5573a6e 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -71,7 +71,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
this.callbackContext.set(cbc);
String helixTaskId = getCallbackContext().getTaskConfig().getId();
logger.info("Running task {}", helixTaskId);
- deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
+ TaskUtil.deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
} catch (Exception e) {
logger.error("Failed at deserializing task data", e);
return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
@@ -130,58 +130,4 @@ public abstract class AbstractTask extends UserContentStore implements Task {
logger.error("Failed to put callback context to the queue", e);
}
}
-
- private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
-
- List<Field> allFields = new ArrayList<>();
- Class genericClass = instance.getClass();
-
- while (AbstractTask.class.isAssignableFrom(genericClass)) {
- Field[] declaredFields = genericClass.getDeclaredFields();
- for (Field declaredField : declaredFields) {
- allFields.add(declaredField);
- }
- genericClass = genericClass.getSuperclass();
- }
-
- for (Field classField : allFields) {
- TaskParam param = classField.getAnnotation(TaskParam.class);
- if (param != null) {
- if (params.containsKey(param.name())) {
- classField.setAccessible(true);
- PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
- Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
- Class<?>[] methodParamType = writeMethod.getParameterTypes();
- Class<?> writeParameterType = methodParamType[0];
-
- if (writeParameterType.isAssignableFrom(String.class)) {
- writeMethod.invoke(instance, params.get(param.name()));
- } else if (writeParameterType.isAssignableFrom(Integer.class) ||
- writeParameterType.isAssignableFrom(Integer.TYPE)) {
- writeMethod.invoke(instance, Integer.parseInt(params.get(param.name())));
- } else if (writeParameterType.isAssignableFrom(Long.class) ||
- writeParameterType.isAssignableFrom(Long.TYPE)) {
- writeMethod.invoke(instance, Long.parseLong(params.get(param.name())));
- } else if (writeParameterType.isAssignableFrom(Boolean.class) ||
- writeParameterType.isAssignableFrom(Boolean.TYPE)) {
- writeMethod.invoke(instance, Boolean.parseBoolean(params.get(param.name())));
- } else if (TaskParamType.class.isAssignableFrom(writeParameterType)) {
- Constructor<?> ctor = writeParameterType.getConstructor();
- Object obj = ctor.newInstance();
- ((TaskParamType)obj).deserialize(params.get(param.name()));
- writeMethod.invoke(instance, obj);
- }
- }
- }
- }
-
- for (Field classField : allFields) {
- TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
- if (outPort != null) {
- classField.setAccessible(true);
- OutPort op = new OutPort();
- op.setNextTaskId(params.get(outPort.name()));
- }
- }
- }
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
similarity index 56%
copy from data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
index 89863fc..4fc32ca 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
@@ -20,10 +20,6 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.UserContentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,105 +29,15 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-public abstract class AbstractTask extends UserContentStore implements Task {
+public class TaskUtil {
- private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
+ private final static Logger logger = LoggerFactory.getLogger(TaskUtil.class);
- private ThreadLocal<TaskCallbackContext> callbackContext = new ThreadLocal<>();
- private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
-
- @TaskOutPort(name = "nextTask")
- private OutPort outPort;
-
- @TaskParam(name = "taskId")
- private ThreadLocal<String> taskId = new ThreadLocal<>();
-
- @TaskParam(name = "retryCount")
- private ThreadLocal<Integer> retryCount = ThreadLocal.withInitial(()-> 3);
-
- public AbstractTask() {
-
- }
-
- @Override
- public TaskResult run() {
- try {
- TaskCallbackContext cbc = callbackContextQueue.poll();
-
- if (cbc == null) {
- logger.error("No callback context available");
- throw new Exception("No callback context available");
- }
-
- this.callbackContext.set(cbc);
- String helixTaskId = getCallbackContext().getTaskConfig().getId();
- logger.info("Running task {}", helixTaskId);
- deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
- } catch (Exception e) {
- logger.error("Failed at deserializing task data", e);
- return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
- }
-
- try {
- return onRun();
- } catch (Exception e) {
- logger.error("Unknown error while running task {}", getTaskId(), e);
- return new TaskResult(TaskResult.Status.FAILED, "Failed due to unknown error");
- }
- }
-
- @Override
- public void cancel() {
- onCancel();
- }
-
- public abstract TaskResult onRun();
-
- public abstract void onCancel();
-
- public OutPort getOutPort() {
- return outPort;
- }
-
- public void setOutPort(OutPort outPort) {
- this.outPort = outPort;
- }
-
- public int getRetryCount() {
- return retryCount.get();
- }
-
- public void setRetryCount(int retryCount) {
- this.retryCount.set(retryCount);
- }
-
- public TaskCallbackContext getCallbackContext() {
- return callbackContext.get();
- }
-
- public String getTaskId() {
- return taskId.get();
- }
-
- public void setTaskId(String taskId) {
- this.taskId.set(taskId);
- }
-
- public void setCallbackContext(TaskCallbackContext callbackContext) {
- logger.info("Setting callback context {}", callbackContext.getJobConfig().getId());
- try {
- this.callbackContextQueue.put(callbackContext);
- } catch (InterruptedException e) {
- logger.error("Failed to put callback context to the queue", e);
- }
- }
-
- private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
+ public static <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
List<Field> allFields = new ArrayList<>();
Class genericClass = instance.getClass();
@@ -149,7 +55,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
if (param != null) {
if (params.containsKey(param.name())) {
classField.setAccessible(true);
- PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
+ PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(instance, classField.getName());
Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
Class<?>[] methodParamType = writeMethod.getParameterTypes();
Class<?> writeParameterType = methodParamType[0];
@@ -184,4 +90,37 @@ public abstract class AbstractTask extends UserContentStore implements Task {
}
}
}
+
+ public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+ Map<String, String> result = new HashMap<>();
+ for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field classField : fields) {
+ TaskParam parm = classField.getAnnotation(TaskParam.class);
+ try {
+ if (parm != null) {
+ Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
+ if (propertyValue instanceof TaskParamType) {
+ result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
+ } else {
+ result.put(parm.name(), propertyValue.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
+ throw e;
+ }
+
+ TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+ if (outPort != null) {
+ classField.setAccessible(true);
+ if (classField.get(data) != null) {
+ result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
+ }
+ }
+ }
+ }
+ return result;
+ }
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
index 91b0cc4..a3c400e 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -33,9 +33,6 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
@TaskParam(name = "DestinationCredToken")
private final ThreadLocal<String> destinationCredToken = new ThreadLocal<>();
- @TaskParam(name = "UserId")
- private final ThreadLocal<String> userId = new ThreadLocal<>();
-
@TaskParam(name = "CallbackUrl")
private final ThreadLocal<String> callbackUrl = new ThreadLocal<>();
@@ -45,17 +42,23 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
@TaskParam(name = "MFTAPIPort")
private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
+ // Security
+ @TaskParam(name = "UserId")
+ private final ThreadLocal<String> userId = new ThreadLocal<>();
+
@TaskParam(name = "MFTClientId")
private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
@TaskParam(name = "MFTClientSecret")
private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
+ ///
+
@TaskParam(name = "MFTCallbackStoreHost")
- private ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
+ private final ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
@TaskParam(name = "MFTCallbackStorePort")
- private ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
+ private final ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
public TaskResult beforeSection() {
@@ -92,6 +95,7 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
public TaskResult afterSection() {
logger.info("Transfer completed successfully");
+ // TODO update metadata into Datalake
return new TaskResult(TaskResult.Status.COMPLETED, "Section 2 completed");
}