You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/03 04:47:32 UTC

[airavata-data-lake] branch master updated: Handling callback path of data transfer task

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e69843  Handling callback path of data transfer task
4e69843 is described below

commit 4e69843ffb5059d0222a689a5269099a27d136c3
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jun 3 00:47:19 2021 -0400

    Handling callback path of data transfer task
---
 .../services/handler/WorkflowEngineAPIHandler.java |   3 -
 .../engine/services/wm/CallbackWorkflowEntity.java |   9 ++
 .../engine/services/wm/WorkflowOperator.java       |  50 +++-----
 .../wm/datasync/DataSyncWorkflowManager.java       |  44 ++++---
 .../workflow/engine/task/AbstractTask.java         |  56 +--------
 .../task/{AbstractTask.java => TaskUtil.java}      | 137 ++++++---------------
 .../engine/task/impl/AsyncDataTransferTask.java    |  14 ++-
 7 files changed, 99 insertions(+), 214 deletions(-)

diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
index c4e44c1..dbbce74 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
@@ -40,9 +40,6 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
     private MFTCallbackStore mftCallbackStore;
 
     @Autowired
-    private CallbackWorkflowStore callbackWorkflowStore;
-
-    @Autowired
     private DataSyncWorkflowManager dataSyncWorkflowManager;
 
     @Override
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
index 7a14573..e254b9c 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 public class CallbackWorkflowEntity {
     private int prevSectionIndex;
+    private Map<String, Map<String, String>> taskValueMap;
     private Map<String, AbstractTask> taskMap;
     private String workflowId;
     private String startTaskId;
@@ -35,6 +36,14 @@ public class CallbackWorkflowEntity {
         this.prevSectionIndex = prevSectionIndex;
     }
 
+    public Map<String, Map<String, String>> getTaskValueMap() {
+        return taskValueMap;
+    }
+
+    public void setTaskValueMap(Map<String, Map<String, String>> taskValueMap) {
+        this.taskValueMap = taskValueMap;
+    }
+
     public Map<String, AbstractTask> getTaskMap() {
         return taskMap;
     }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 4eee6f1..7571057 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -21,6 +21,7 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTa
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
@@ -102,11 +103,21 @@ public class WorkflowOperator {
     }
 
     private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String workflowName,
-                                         String nonBlockingTaskId, int currentSection) {
+                                         String nonBlockingTaskId, int currentSection) throws Exception {
+
+        NonBlockingTask nbTask = (NonBlockingTask) taskMap.get(nonBlockingTaskId);
+        nbTask.setCurrentSection(currentSection + 1);
+
+        Map<String, Map<String, String>> serializedMap = new HashMap<>();
+        for (String key : taskMap.keySet()) {
+            Map<String, String> stringMap = TaskUtil.serializeTaskData(taskMap.get(key));
+            serializedMap.put(key, stringMap);
+        }
 
         CallbackWorkflowEntity cwe = new CallbackWorkflowEntity();
         cwe.setWorkflowId(workflowName);
         cwe.setPrevSectionIndex(currentSection);
+        cwe.setTaskValueMap(serializedMap);
         cwe.setTaskMap(taskMap);
         cwe.setStartTaskId(nonBlockingTaskId);
         this.cbws.saveWorkflowEntity(cwe);
@@ -131,7 +142,7 @@ public class WorkflowOperator {
                     .setTaskId(currentTask.getTaskId())
                     .setCommand(taskName);
 
-            Map<String, String> paramMap = serializeTaskData(currentTask);
+            Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
             paramMap.forEach(taskBuilder::addConfig);
 
             List<TaskConfig> taskBuilds = new ArrayList<>();
@@ -165,7 +176,7 @@ public class WorkflowOperator {
                     .setTaskId(currentTask.getTaskId())
                     .setCommand(taskName);
 
-            Map<String, String> paramMap = serializeTaskData(currentTask);
+            Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
             paramMap.forEach(taskBuilder::addConfig);
 
             List<TaskConfig> taskBuilds = new ArrayList<>();
@@ -206,39 +217,6 @@ public class WorkflowOperator {
         taskDriver.delete(workflowName);
     }
 
-    private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
-        Map<String, String> result = new HashMap<>();
-        for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
-            Field[] fields = c.getDeclaredFields();
-            for (Field classField : fields) {
-                TaskParam parm = classField.getAnnotation(TaskParam.class);
-                try {
-                    if (parm != null) {
-                        Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
-                        if (propertyValue instanceof TaskParamType) {
-                            result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
-                        } else {
-                            result.put(parm.name(), propertyValue.toString());
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
-                    throw e;
-                }
-
-                TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
-                if (outPort != null) {
-                    classField.setAccessible(true);
-                    if (classField.get(data) != null) {
-                        result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
-                    }
-                }
-            }
-        }
-        return result;
-    }
-
     private <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
 
         List<OutPort> outPorts = new ArrayList<>();
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
index 53350c4..f723ead 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
@@ -19,29 +19,18 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.da
 
 import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEvent;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEventDeserializer;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowEntity;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.WorkflowOperator;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.WebApplicationType;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
 
 import java.time.Duration;
 import java.util.*;
@@ -78,7 +67,7 @@ public class DataSyncWorkflowManager {
     @Autowired
     private CallbackWorkflowStore callbackWorkflowStore;
 
-    private ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
+    private final ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
 
     private WorkflowOperator workflowOperator;
 
@@ -97,9 +86,32 @@ public class DataSyncWorkflowManager {
         return consumer;
     }
 
-    private boolean processKakfaMessage(DataTransferEvent dte) {
+    private boolean processCallbackMessage(DataTransferEvent dte) {
         logger.info("Processing DTE for task {}, workflow {} and status {}",
                 dte.getTaskId(), dte.getWorkflowId(), dte.getTransferStatus());
+        Optional<CallbackWorkflowEntity> workflowEntityOp = callbackWorkflowStore.getWorkflowEntity(dte.getWorkflowId(), dte.getTaskId(), 1);
+        if (workflowEntityOp.isPresent()) {
+            logger.info("Found a callback workflow to continue workflow {}", dte.getWorkflowId());
+            CallbackWorkflowEntity callbackWorkflowEntity = workflowEntityOp.get();
+            String[] startTasks = {callbackWorkflowEntity.getStartTaskId()};
+
+            try {
+                Map<String, AbstractTask> taskMap = callbackWorkflowEntity.getTaskMap();
+                Map<String, Map<String, String>> taskValueMap = callbackWorkflowEntity.getTaskValueMap();
+
+                // Initialize task data
+                for (String key : taskMap.keySet()) {
+                    TaskUtil.deserializeTaskData(taskMap.get(key), taskValueMap.get(key));
+                }
+
+                String workflowId = this.workflowOperator.buildAndRunWorkflow(taskMap, startTasks);
+                logger.info("Successfully submitted callback workflow {} for incoming workflow {}", workflowId, dte.getWorkflowId());
+            } catch (Exception e) {
+                logger.error("Failed in executing callback workflow for worrkflow {}", dte.getWorkflowId());
+            }
+        } else {
+            logger.warn("Didn't find a callback workflow for workflow {}", dte.getWorkflowId());
+        }
         return true;
     }
 
@@ -120,7 +132,7 @@ public class DataSyncWorkflowManager {
                         List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
                         for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
                             processingFutures.add(executorCompletionService.submit(() -> {
-                                boolean success = processKakfaMessage(record.value());
+                                boolean success = processCallbackMessage(record.value());
                                 logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
                                 return success;
                             }));
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 89863fc..5573a6e 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -71,7 +71,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             this.callbackContext.set(cbc);
             String helixTaskId = getCallbackContext().getTaskConfig().getId();
             logger.info("Running task {}", helixTaskId);
-            deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
+            TaskUtil.deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
         } catch (Exception e) {
             logger.error("Failed at deserializing task data", e);
             return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
@@ -130,58 +130,4 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             logger.error("Failed to put callback context to the queue", e);
         }
     }
-
-    private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
-
-        List<Field> allFields = new ArrayList<>();
-        Class genericClass = instance.getClass();
-
-        while (AbstractTask.class.isAssignableFrom(genericClass)) {
-            Field[] declaredFields = genericClass.getDeclaredFields();
-            for (Field declaredField : declaredFields) {
-                allFields.add(declaredField);
-            }
-            genericClass = genericClass.getSuperclass();
-        }
-
-        for (Field classField : allFields) {
-            TaskParam param = classField.getAnnotation(TaskParam.class);
-            if (param != null) {
-                if (params.containsKey(param.name())) {
-                    classField.setAccessible(true);
-                    PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
-                    Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
-                    Class<?>[] methodParamType = writeMethod.getParameterTypes();
-                    Class<?> writeParameterType = methodParamType[0];
-
-                    if (writeParameterType.isAssignableFrom(String.class)) {
-                        writeMethod.invoke(instance, params.get(param.name()));
-                    } else if (writeParameterType.isAssignableFrom(Integer.class) ||
-                            writeParameterType.isAssignableFrom(Integer.TYPE)) {
-                        writeMethod.invoke(instance, Integer.parseInt(params.get(param.name())));
-                    } else if (writeParameterType.isAssignableFrom(Long.class) ||
-                            writeParameterType.isAssignableFrom(Long.TYPE)) {
-                        writeMethod.invoke(instance, Long.parseLong(params.get(param.name())));
-                    } else if (writeParameterType.isAssignableFrom(Boolean.class) ||
-                            writeParameterType.isAssignableFrom(Boolean.TYPE)) {
-                        writeMethod.invoke(instance, Boolean.parseBoolean(params.get(param.name())));
-                    } else if (TaskParamType.class.isAssignableFrom(writeParameterType)) {
-                        Constructor<?> ctor = writeParameterType.getConstructor();
-                        Object obj = ctor.newInstance();
-                        ((TaskParamType)obj).deserialize(params.get(param.name()));
-                        writeMethod.invoke(instance, obj);
-                    }
-                }
-            }
-        }
-
-        for (Field classField : allFields) {
-            TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
-            if (outPort != null) {
-                classField.setAccessible(true);
-                OutPort op = new OutPort();
-                op.setNextTaskId(params.get(outPort.name()));
-            }
-        }
-    }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
similarity index 56%
copy from data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
index 89863fc..4fc32ca 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
@@ -20,10 +20,6 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
 import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.UserContentStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,105 +29,15 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
-public abstract class AbstractTask extends UserContentStore implements Task {
+public class TaskUtil {
 
-    private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
+    private final static Logger logger = LoggerFactory.getLogger(TaskUtil.class);
 
-    private ThreadLocal<TaskCallbackContext> callbackContext = new ThreadLocal<>();
-    private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
-
-    @TaskOutPort(name = "nextTask")
-    private OutPort outPort;
-
-    @TaskParam(name = "taskId")
-    private ThreadLocal<String> taskId = new ThreadLocal<>();
-
-    @TaskParam(name = "retryCount")
-    private ThreadLocal<Integer> retryCount = ThreadLocal.withInitial(()-> 3);
-
-    public AbstractTask() {
-
-    }
-
-    @Override
-    public TaskResult run() {
-        try {
-            TaskCallbackContext cbc = callbackContextQueue.poll();
-
-            if (cbc == null) {
-                logger.error("No callback context available");
-                throw new Exception("No callback context available");
-            }
-
-            this.callbackContext.set(cbc);
-            String helixTaskId = getCallbackContext().getTaskConfig().getId();
-            logger.info("Running task {}", helixTaskId);
-            deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
-        } catch (Exception e) {
-            logger.error("Failed at deserializing task data", e);
-            return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
-        }
-
-        try {
-            return onRun();
-        } catch (Exception e) {
-            logger.error("Unknown error while running task {}", getTaskId(), e);
-            return new TaskResult(TaskResult.Status.FAILED, "Failed due to unknown error");
-        }
-    }
-
-    @Override
-    public void cancel() {
-        onCancel();
-    }
-
-    public abstract TaskResult onRun();
-
-    public abstract void onCancel();
-
-    public OutPort getOutPort() {
-        return outPort;
-    }
-
-    public void setOutPort(OutPort outPort) {
-        this.outPort = outPort;
-    }
-
-    public int getRetryCount() {
-        return retryCount.get();
-    }
-
-    public void setRetryCount(int retryCount) {
-        this.retryCount.set(retryCount);
-    }
-
-    public TaskCallbackContext getCallbackContext() {
-        return callbackContext.get();
-    }
-
-    public String getTaskId() {
-        return taskId.get();
-    }
-
-    public void setTaskId(String taskId) {
-        this.taskId.set(taskId);
-    }
-
-    public void setCallbackContext(TaskCallbackContext callbackContext) {
-        logger.info("Setting callback context {}", callbackContext.getJobConfig().getId());
-        try {
-            this.callbackContextQueue.put(callbackContext);
-        } catch (InterruptedException e) {
-            logger.error("Failed to put callback context to the queue", e);
-        }
-    }
-
-    private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
+    public static <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
 
         List<Field> allFields = new ArrayList<>();
         Class genericClass = instance.getClass();
@@ -149,7 +55,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             if (param != null) {
                 if (params.containsKey(param.name())) {
                     classField.setAccessible(true);
-                    PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
+                    PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(instance, classField.getName());
                     Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
                     Class<?>[] methodParamType = writeMethod.getParameterTypes();
                     Class<?> writeParameterType = methodParamType[0];
@@ -184,4 +90,37 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             }
         }
     }
+
+    public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+        Map<String, String> result = new HashMap<>();
+        for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field classField : fields) {
+                TaskParam parm = classField.getAnnotation(TaskParam.class);
+                try {
+                    if (parm != null) {
+                        Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
+                        if (propertyValue instanceof TaskParamType) {
+                            result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
+                        } else {
+                            result.put(parm.name(), propertyValue.toString());
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
+                    throw e;
+                }
+
+                TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+                if (outPort != null) {
+                    classField.setAccessible(true);
+                    if (classField.get(data) != null) {
+                        result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
+                    }
+                }
+            }
+        }
+        return result;
+    }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
index 91b0cc4..a3c400e 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -33,9 +33,6 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
     @TaskParam(name = "DestinationCredToken")
     private final ThreadLocal<String> destinationCredToken = new ThreadLocal<>();
 
-    @TaskParam(name = "UserId")
-    private final ThreadLocal<String> userId = new ThreadLocal<>();
-
     @TaskParam(name = "CallbackUrl")
     private final ThreadLocal<String> callbackUrl = new ThreadLocal<>();
 
@@ -45,17 +42,23 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
     @TaskParam(name = "MFTAPIPort")
     private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
 
+    // Security
+    @TaskParam(name = "UserId")
+    private final ThreadLocal<String> userId = new ThreadLocal<>();
+
     @TaskParam(name = "MFTClientId")
     private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
 
     @TaskParam(name = "MFTClientSecret")
     private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
 
+    ///
+
     @TaskParam(name = "MFTCallbackStoreHost")
-    private ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
+    private final ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
 
     @TaskParam(name = "MFTCallbackStorePort")
-    private ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
+    private final ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
 
 
     public TaskResult beforeSection() {
@@ -92,6 +95,7 @@ public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
 
     public TaskResult afterSection() {
         logger.info("Transfer completed successfully");
+        // TODO update metadata into Datalake
         return new TaskResult(TaskResult.Status.COMPLETED, "Section 2 completed");
     }