You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/02 09:11:14 UTC

[airavata-data-lake] branch master updated: Adding an API to fetch MFT callback entries and implementing first part of non blocking MFT transfers

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new bf113f9  Adding an API to fetch MFT callback entries and implementing first part of non blocking MFT transfers
bf113f9 is described below

commit bf113f96a0e026788f33218dd14c11d170b2bd8e
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jun 2 05:10:01 2021 -0400

    Adding an API to fetch MFT callback entries and implementing first part of non blocking MFT transfers
---
 data-orchestrator/workflow-engine/README.md        |   6 +-
 data-orchestrator/workflow-engine/pom.xml          |  21 ++-
 .../engine/services/handler/APIRunner.java         |  27 ---
 .../services/handler/WorkflowEngineAPIHandler.java |  38 ----
 .../services/wm/DataSyncWorkflowManager.java       |  91 ----------
 .../engine/services/wm/PreWorkflowManager.java     |  47 -----
 .../engine/client/WorkflowEngineClient.java        |  24 +++
 .../pom.xml                                        |  14 +-
 .../workflow/engine/monitor/AsyncEventMonitor.java |  16 ++
 .../engine/monitor/EventMonitorConfig.java}        |  12 +-
 .../monitor/filter/mft/DataTransferEvent.java}     |  34 ++--
 .../filter/mft/DataTransferEventDeserializer.java} |  31 ++--
 .../filter/mft/DataTransferEventSerializer.java}   |  27 +--
 .../filter/mft/KafkaMFTMessageProducer.java        |  62 +++++++
 .../engine/monitor/filter/mft/MFTFilter.java       |  68 ++++++++
 .../engine/services/controller/Controller.java     |  12 +-
 .../engine/services/handler/APIRunner.java         |  50 ++++++
 .../engine/services/handler/AppConfig.java}        |  35 ++--
 .../services/handler/WorkflowEngineAPIHandler.java |  87 ++++++++++
 .../engine/services/participant/Participant.java   |  13 +-
 .../services/wm/CallbackWorkflowEntity.java}       |  54 ++++--
 .../engine/services/wm/CallbackWorkflowStore.java} |  28 ++-
 .../engine/services/wm/WorkflowOperator.java       |  25 ++-
 .../wm/datasync/DataSyncWorkflowManager.java       | 179 +++++++++++++++++++
 .../services/wm/datasync/MFTCallbackEntity.java}   |  41 +++--
 .../services/wm/datasync/MFTCallbackStore.java}    |  39 ++---
 .../workflow/engine/task/AbstractTask.java         |   0
 .../engine/task/BiSectionNonBlockingTask.java}     |  24 ++-
 .../workflow/engine/task/BlockingTask.java         |   0
 .../workflow/engine/task/NonBlockingTask.java      |   0
 .../orchestrator/workflow/engine/task/OutPort.java |   0
 .../workflow/engine/task/TaskParamType.java        |   0
 .../engine/task/annotation/BlockingTaskDef.java    |   0
 .../engine/task/annotation/NonBlockingSection.java |   0
 .../engine/task/annotation/NonBlockingTaskDef.java |   0
 .../engine/task/annotation/TaskOutPort.java        |   0
 .../workflow/engine/task/annotation/TaskParam.java |   0
 .../engine/task/impl/AsyncDataTransferTask.java    | 193 +++++++++++++++++++++
 .../engine/task/impl/ExampleBlockingTask.java      |   0
 .../engine/task/impl/ExampleNonBlockingTask.java   |   0
 .../src/main/resources/application.properties      |  12 +-
 .../src/main/resources/logback.xml                 |   0
 .../src/main/resources/task-list.yaml              |   3 +-
 .../src/main/proto/service/WorkflowService.proto   |  21 ++-
 pom.xml                                            |   2 +-
 45 files changed, 961 insertions(+), 375 deletions(-)

diff --git a/data-orchestrator/workflow-engine/README.md b/data-orchestrator/workflow-engine/README.md
index 94bc9eb..712de75 100644
--- a/data-orchestrator/workflow-engine/README.md
+++ b/data-orchestrator/workflow-engine/README.md
@@ -1,8 +1,8 @@
 ### Service Execution Order
 
-* Controller
-* Participant
-* DataSyncWorkflowManager
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager
 
 ### Configure the participant with new tasks
 
diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
index a19564e..4b0c738 100644
--- a/data-orchestrator/workflow-engine/pom.xml
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -31,7 +31,7 @@
     <modelVersion>4.0.0</modelVersion>
     <packaging>pom</packaging>
     <modules>
-        <module>workflow-engine-api</module>
+        <module>workflow-engine-core</module>
         <module>workflow-engine-stubs</module>
         <module>workflow-engine-client</module>
     </modules>
@@ -56,6 +56,10 @@
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -64,6 +68,11 @@
             <version>${spring.boot.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <version>${spring.boot.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
             <version>${log4j.over.slf4j}</version>
@@ -78,5 +87,15 @@
             <artifactId>commons-beanutils</artifactId>
             <version>${commons.beanutils.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-api-client</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
deleted file mode 100644
index 6d69a39..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.domain.EntityScan;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
-import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
-
-@ComponentScan(basePackages = {"org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler",
-        "org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm"})
-@SpringBootApplication()
-@EnableJpaAuditing
-@EnableJpaRepositories("org.apache.airavata.datalake")
-@EntityScan("org.apache.airavata.datalake")
-public class APIRunner implements CommandLineRunner {
-    public static void main(String[] args) {
-        SpringApplication.run(APIRunner.class, args);
-    }
-
-    @Override
-    public void run(String... args) throws Exception {
-        System.out.println("RUnning");
-    }
-
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
deleted file mode 100644
index 7f760fa..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationResponse;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.PreWorkflowManager;
-import org.lognet.springboot.grpc.GRpcService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-@GRpcService
-public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
-    private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowEngineAPIHandler.class);
-
-    private PreWorkflowManager preworkflowManager;
-
-
-    @Autowired
-    public WorkflowEngineAPIHandler(PreWorkflowManager preworkflowManager) {
-        this.preworkflowManager = preworkflowManager;
-    }
-
-    @Override
-    public void invokeWorkflow(WorkflowInvocationRequest request,
-                               StreamObserver<WorkflowInvocationResponse> responseObserver) {
-        try {
-            preworkflowManager.launchWorkflow(request.getMessage().getMessageId());
-
-
-        } catch (Exception ex) {
-            String msg = "Error occurred while invoking blocking pipeline";
-            LOGGER.error(msg, ex);
-            throw new RuntimeException(msg, ex);
-        }
-    }
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
deleted file mode 100644
index 9f08751..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-@SpringBootApplication()
-@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm")
-public class DataSyncWorkflowManager implements CommandLineRunner {
-
-    private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
-
-    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
-    private String clusterName;
-
-    @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
-    private String workflowManagerName;
-
-    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
-    private String zkAddress;
-
-    @Override
-    public void run(String... args) throws Exception {
-        WorkflowOperator workflowOperator = new WorkflowOperator();
-        workflowOperator.init(clusterName, workflowManagerName, zkAddress);
-
-        ExampleBlockingTask bt1 = new ExampleBlockingTask();
-        bt1.setTaskId("bt1-" + UUID.randomUUID());
-
-        ExampleBlockingTask bt2 = new ExampleBlockingTask();
-        bt2.setTaskId("bt2-" + UUID.randomUUID());
-
-        ExampleBlockingTask bt3 = new ExampleBlockingTask();
-        bt3.setTaskId("bt3-" + UUID.randomUUID());
-
-        ExampleBlockingTask bt4 = new ExampleBlockingTask();
-        bt4.setTaskId("bt4-" + UUID.randomUUID());
-
-        ExampleNonBlockingTask nbt1 = new ExampleNonBlockingTask();
-        nbt1.setTaskId("nbt1-" + UUID.randomUUID());
-        nbt1.setCurrentSection(2);
-
-        // Setting dependency
-        bt1.setOutPort(new OutPort().setNextTaskId(nbt1.getTaskId()));
-        //bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
-        //bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
-
-        Map<String, AbstractTask> taskMap = new HashMap<>();
-        taskMap.put(bt1.getTaskId(), bt1);
-        taskMap.put(nbt1.getTaskId(), nbt1);
-        //taskMap.put(bt2.getTaskId(), bt2);
-        //taskMap.put(bt3.getTaskId(), bt3);
-        //taskMap.put(bt4.getTaskId(), bt4);
-        //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
-        String[] startTaskIds = {bt1.getTaskId()};
-        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
-        logger.info("Launched workflow {}", workflowId);
-    }
-
-    public static void main(String args[]) throws Exception {
-        SpringApplication.run(DataSyncWorkflowManager.class);
-    }
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java
deleted file mode 100644
index 366c6ab..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.Optional;
-import java.util.UUID;
-
-/**
- * A class responsible to create task DAG and launch experiment
- */
-@Component
-public class PreWorkflowManager {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(PreWorkflowManager.class);
-
-    private DataOrchestratorEventRepository dataOrchestratorEventRepository;
-
-    private WorkflowEntityRepository workflowEntityRepository;
-
-    @Autowired
-    public PreWorkflowManager(DataOrchestratorEventRepository dataOrchestratorEventRepository,
-                              WorkflowEntityRepository workflowEntityRepository) {
-        this.dataOrchestratorEventRepository = dataOrchestratorEventRepository;
-        this.workflowEntityRepository = workflowEntityRepository;
-    }
-
-    public boolean launchWorkflow(String msgId) {
-
-        Optional<DataOrchestratorEntity> dataOrchestratorEntity = dataOrchestratorEventRepository.findById(msgId);
-        dataOrchestratorEntity.ifPresent(enty -> {
-            String workflowId = "WORKFLOW_" + UUID.randomUUID().toString();
-            WorkflowEntity workFlowEntity = new WorkflowEntity();
-            workFlowEntity.setId(workflowId);
-            workFlowEntity.setDataOrchestratorEntity(enty);
-            workFlowEntity.setStatus(EntityStatus.WORKFLOW_LAUNCHED.name());
-            workflowEntityRepository.save(workFlowEntity);
-        });
-
-        return true;
-    }
-
-
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java b/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
index cfec521..040ce39 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.airavata.datalake.orchestrator.workflow.engine.client;
 
 import io.grpc.ManagedChannel;
@@ -7,6 +24,12 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage
 import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
 
 public class WorkflowEngineClient {
+
+    public static WorkflowServiceGrpc.WorkflowServiceBlockingStub buildClient(String host, int port) {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
+        return WorkflowServiceGrpc.newBlockingStub(channel);
+    }
+
     public static void main(String[] args) {
         ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565).usePlaintext().build();
         WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub =
@@ -16,5 +39,6 @@ public class WorkflowEngineClient {
                 .setMessage(WorkflowMessage.newBuilder().setMessageId("387bd8e9-58ce-4626-8347-90a8ab12376e "))
                 .build();
         workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+        System.out.println("Done");
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/pom.xml b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
similarity index 88%
rename from data-orchestrator/workflow-engine/workflow-engine-api/pom.xml
rename to data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
index 594c438..56a0a07 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/pom.xml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
@@ -11,7 +11,7 @@
 
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>workflow-engine-api</artifactId>
+    <artifactId>workflow-engine-core</artifactId>
 
 
     <dependencies>
@@ -63,7 +63,7 @@
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter</artifactId>
-            <version>2.4.3</version>
+            <version>${spring.boot.version}</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -97,6 +97,16 @@
             <version>1.27</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>workflow-engine-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <properties>
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
similarity index 50%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
index 4afa566..7ff64bf 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
@@ -17,5 +17,21 @@
 
 package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
 
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+@SpringBootApplication
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.monitor")
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+        DataSourceTransactionManagerAutoConfiguration.class,
+        HibernateJpaAutoConfiguration.class})
 public class AsyncEventMonitor {
+    public static void main(String[] args) {
+        SpringApplication.run(AsyncEventMonitor.class, args);
+    }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
similarity index 67%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
index 4afa566..a1d7d9b 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
@@ -17,5 +17,15 @@
 
 package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
 
-public class AsyncEventMonitor {
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.KafkaMFTMessageProducer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class EventMonitorConfig {
+
+    @Bean(initMethod = "init")
+    public KafkaMFTMessageProducer kafkaMFTMessageProducer() {
+        return new KafkaMFTMessageProducer();
+    }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
similarity index 58%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
index 9033f1a..ffa0dab 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
@@ -15,28 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class DataTransferEvent {
+    private String taskId;
+    private String workflowId;
+    private String transferStatus;
 
-public abstract class BlockingTask extends AbstractTask {
-
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+    public String getTaskId() {
+        return taskId;
+    }
 
-    public BlockingTask() {
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
     }
 
-    @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
+    public String getWorkflowId() {
+        return workflowId;
     }
 
-    public abstract TaskResult runBlockingCode();
+    public void setWorkflowId(String workflowId) {
+        this.workflowId = workflowId;
+    }
 
-    @Override
-    public void onCancel() {
+    public String getTransferStatus() {
+        return transferStatus;
+    }
 
+    public void setTransferStatus(String transferStatus) {
+        this.transferStatus = transferStatus;
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
similarity index 62%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
index 9033f1a..12e2066 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
@@ -15,28 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class BlockingTask extends AbstractTask {
-
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
-    public BlockingTask() {
-    }
+import org.apache.kafka.common.serialization.Deserializer;
 
+public class DataTransferEventDeserializer implements Deserializer<DataTransferEvent> {
     @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
-    }
-
-    public abstract TaskResult runBlockingCode();
-
-    @Override
-    public void onCancel() {
-
+    public DataTransferEvent deserialize(String s, byte[] bytes) {
+        String deserializedData = new String(bytes);
+        String[] parts = deserializedData.split("/");
+        DataTransferEvent dte = new DataTransferEvent();
+        dte.setTaskId(parts[0]);
+        dte.setWorkflowId(parts[1]);
+        dte.setTransferStatus(parts[2]);
+        return dte;
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
similarity index 63%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
index 9033f1a..c3a2d35 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
@@ -15,28 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kafka.common.serialization.Serializer;
 
-public abstract class BlockingTask extends AbstractTask {
+import java.nio.charset.StandardCharsets;
 
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
-    public BlockingTask() {
-    }
+public class DataTransferEventSerializer implements Serializer<DataTransferEvent> {
 
     @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
-    }
-
-    public abstract TaskResult runBlockingCode();
-
-    @Override
-    public void onCancel() {
-
+    public byte[] serialize(String s, DataTransferEvent dataTransferEvent) {
+        String data = dataTransferEvent.getTaskId() +
+                "/" + dataTransferEvent.getWorkflowId() +
+                "/" + dataTransferEvent.getTransferStatus();
+        return data.getBytes(StandardCharsets.UTF_8);
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java
new file mode 100644
index 0000000..c3b257c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
+
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class KafkaMFTMessageProducer {
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+    private String kafkaUrl;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.mft.publisher.name}")
+    private String publisherName;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.publish.topic}")
+    private String topic;
+
+    private Producer<String, DataTransferEvent> producer;
+
+    public void init() {
+        this.producer = createProducer();
+    }
+
+    private Producer<String, DataTransferEvent> createProducer() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, publisherName);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                DataTransferEventSerializer.class.getName());
+        return new KafkaProducer<String, DataTransferEvent>(props);
+    }
+
+    public void publish(DataTransferEvent event) throws ExecutionException, InterruptedException {
+        final ProducerRecord<String, DataTransferEvent> record = new ProducerRecord<>(
+                topic,
+                event.getTaskId(),
+                event);
+        RecordMetadata recordMetadata = producer.send(record).get();
+        producer.flush();
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java
new file mode 100644
index 0000000..6c76175
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbackGetRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbackGetResponse;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.client.WorkflowEngineClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/mft")
+public class MFTFilter {
+
+    private final static Logger logger = LoggerFactory.getLogger(MFTFilter.class);
+
+    @Autowired
+    private KafkaMFTMessageProducer mftMessageProducer;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.host}")
+    private String datasyncWmHost;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
+    private int datasyncWmPort;
+
+    @GetMapping(value = "/{transferId}/{status}")
+    public String fetchSFTPRemote(@PathVariable(name = "transferId") String transferId,
+                                  @PathVariable(name = "status") String status) {
+
+        logger.info("Got transfer status {} for transfer id {}", status, transferId);
+
+        WorkflowServiceGrpc.WorkflowServiceBlockingStub wmClient = WorkflowEngineClient.buildClient(datasyncWmHost, datasyncWmPort);
+        MFTCallbackGetResponse mftCallback = wmClient.getMFTCallback(
+                MFTCallbackGetRequest.newBuilder().setMftTransferId(transferId).build());
+
+        DataTransferEvent dataTransferEvent = new DataTransferEvent();
+        dataTransferEvent.setTransferStatus(status);
+        dataTransferEvent.setTaskId(mftCallback.getTaskId());
+        dataTransferEvent.setWorkflowId(mftCallback.getWorkflowId());
+        try {
+            mftMessageProducer.publish(dataTransferEvent);
+        } catch (Exception e) {
+            logger.error("Failed to publish even to kafka with transfer id {} and status {}", transferId, status);
+        }
+        return "OK";
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
similarity index 83%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
index 83e6af5..bcbba4a 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
@@ -25,12 +25,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 
 import java.util.concurrent.CountDownLatch;
 
 @SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+        DataSourceTransactionManagerAutoConfiguration.class,
+        HibernateJpaAutoConfiguration.class})
 @ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller")
 public class Controller implements CommandLineRunner {
 
@@ -89,6 +97,8 @@ public class Controller implements CommandLineRunner {
     }
 
     public static void main(String args[]) throws Exception {
-        SpringApplication.run(Controller.class);
+        SpringApplication app = new SpringApplication(Controller.class);
+        app.setWebApplicationType(WebApplicationType.NONE);
+        app.run(args);
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
new file mode 100644
index 0000000..329bc40
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+@ComponentScan(basePackages = {"org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler",
+        "org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm"})
+@SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+        DataSourceTransactionManagerAutoConfiguration.class,
+        HibernateJpaAutoConfiguration.class})
+/*@EnableJpaAuditing
+@EnableJpaRepositories("org.apache.airavata.datalake")
+@EntityScan("org.apache.airavata.datalake")*/
+public class APIRunner implements CommandLineRunner {
+    public static void main(String[] args) {
+        SpringApplication app = new SpringApplication(APIRunner.class);
+        app.setWebApplicationType(WebApplicationType.NONE);
+        app.run(args);
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        System.out.println("Running");
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
similarity index 60%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
index 0bdd9c9..ece0005 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
@@ -15,29 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
 
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
-import org.apache.helix.task.TaskResult;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
-@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
-public class ExampleNonBlockingTask extends NonBlockingTask {
+@Configuration
+public class AppConfig {
+    private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
 
-    private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+    @Bean(initMethod = "init")
+    public DataSyncWorkflowManager dataSyncWorkflowManager() {
+        return new DataSyncWorkflowManager();
+    }
 
-    @NonBlockingSection(sectionIndex = 1)
-    public TaskResult section1() {
-        logger.info("Running section 1");
-        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    @Bean
+    public CallbackWorkflowStore callbackWorkflowStore() {
+        return new CallbackWorkflowStore();
     }
 
-    @NonBlockingSection(sectionIndex = 2)
-    public TaskResult section2() {
-        logger.info("Running section 2");
-        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    @Bean
+    public MFTCallbackStore mftCallbackStore() {
+        return new MFTCallbackStore();
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
new file mode 100644
index 0000000..c4e44c1
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
+
+import com.google.protobuf.Empty;
+import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackEntity;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackStore;
+import org.dozer.DozerBeanMapper;
+import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.Optional;
+
+@GRpcService
+public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
+    private static final Logger logger = LoggerFactory.getLogger(WorkflowEngineAPIHandler.class);
+
+    @Autowired
+    private MFTCallbackStore mftCallbackStore;
+
+    @Autowired
+    private CallbackWorkflowStore callbackWorkflowStore;
+
+    @Autowired
+    private DataSyncWorkflowManager dataSyncWorkflowManager;
+
+    @Override
+    public void invokeWorkflow(WorkflowInvocationRequest request,
+                               StreamObserver<WorkflowInvocationResponse> responseObserver) {
+        try {
+            dataSyncWorkflowManager.submitDataSyncWorkflow();
+            responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
+            responseObserver.onCompleted();
+        } catch (Exception ex) {
+            String msg = "Error occurred while invoking blocking pipeline";
+            logger.error(msg, ex);
+            responseObserver.onError(new Exception(msg));
+        }
+    }
+
+    @Override
+    public void getMFTCallback(MFTCallbackGetRequest request, StreamObserver<MFTCallbackGetResponse> responseObserver) {
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        Optional<MFTCallbackEntity> mftCallbackOp = mftCallbackStore.findMFTCallback(request.getMftTransferId());
+        if (mftCallbackOp.isPresent()) {
+            MFTCallbackEntity callbackEntity = mftCallbackOp.get();
+            MFTCallbackGetResponse.Builder builder = MFTCallbackGetResponse.newBuilder();
+            mapper.map(callbackEntity, builder);
+            responseObserver.onNext(builder.build());
+            responseObserver.onCompleted();
+        } else {
+            logger.error("No callback entity for mft transfer id {}", request.getMftTransferId());
+            responseObserver.onError(new Exception("No callback entity for mft transfer id " + request.getMftTransferId()));
+        }
+    }
+
+    @Override
+    public void saveMFTCallback(MFTCallbacSaveRequest request, StreamObserver<Empty> responseObserver) {
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        MFTCallbackEntity entity = mapper.map(request, MFTCallbackEntity.class);
+        mftCallbackStore.saveMFTCallBack(entity);
+        logger.info("Saved callback entity for transfer id {}", request.getMftTransferId());
+        responseObserver.onNext(Empty.newBuilder().build());
+        responseObserver.onCompleted();
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
similarity index 93%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index 65314ee..69ecaf3 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -17,6 +17,7 @@
 
 package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
 
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
@@ -36,7 +37,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.yaml.snakeyaml.Yaml;
 
@@ -44,6 +50,9 @@ import java.io.*;
 import java.util.*;
 
 @SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+        DataSourceTransactionManagerAutoConfiguration.class,
+        HibernateJpaAutoConfiguration.class})
 @ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant")
 public class Participant implements CommandLineRunner {
 
@@ -268,6 +277,8 @@ public class Participant implements CommandLineRunner {
     }
 
     public static void main(String args[]) throws Exception {
-        SpringApplication.run(Participant.class);
+        SpringApplication app = new SpringApplication(Participant.class);
+        app.setWebApplicationType(WebApplicationType.NONE);
+        app.run(args);
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
similarity index 50%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
index 0bdd9c9..7a14573 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
@@ -15,29 +15,47 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
 
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
 
-@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
-public class ExampleNonBlockingTask extends NonBlockingTask {
+import java.util.Map;
 
-    private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+public class CallbackWorkflowEntity {
+    private int prevSectionIndex;
+    private Map<String, AbstractTask> taskMap;
+    private String workflowId;
+    private String startTaskId;
 
-    @NonBlockingSection(sectionIndex = 1)
-    public TaskResult section1() {
-        logger.info("Running section 1");
-        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    public int getPrevSectionIndex() {
+        return prevSectionIndex;
     }
 
-    @NonBlockingSection(sectionIndex = 2)
-    public TaskResult section2() {
-        logger.info("Running section 2");
-        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    public void setPrevSectionIndex(int prevSectionIndex) {
+        this.prevSectionIndex = prevSectionIndex;
+    }
+
+    public Map<String, AbstractTask> getTaskMap() {
+        return taskMap;
+    }
+
+    public void setTaskMap(Map<String, AbstractTask> taskMap) {
+        this.taskMap = taskMap;
+    }
+
+    public String getWorkflowId() {
+        return workflowId;
+    }
+
+    public void setWorkflowId(String workflowId) {
+        this.workflowId = workflowId;
+    }
+
+    public String getStartTaskId() {
+        return startTaskId;
+    }
+
+    public void setStartTaskId(String startTaskId) {
+        this.startTaskId = startTaskId;
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
similarity index 58%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
index 9033f1a..0f6ef8a 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
@@ -15,28 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
 
-public abstract class BlockingTask extends AbstractTask {
+public class CallbackWorkflowStore {
 
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+    private final List<CallbackWorkflowEntity> workflowEntities = new ArrayList<>();
 
-    public BlockingTask() {
+    public void saveWorkflowEntity(CallbackWorkflowEntity cwe) {
+        workflowEntities.add(cwe);
     }
 
-    @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
-    }
-
-    public abstract TaskResult runBlockingCode();
-
-    @Override
-    public void onCancel() {
-
+    public Optional<CallbackWorkflowEntity> getWorkflowEntity(String workflowId, String taskId, int prevSectionIndex) {
+        return workflowEntities.stream().filter(e ->
+                e.getWorkflowId().equals(workflowId) &&
+                        e.getStartTaskId().equals(taskId) &&
+                        e.getPrevSectionIndex() == prevSectionIndex).findFirst();
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
similarity index 90%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index ba3ef58..4eee6f1 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -47,8 +47,11 @@ public class WorkflowOperator {
 
     private TaskDriver taskDriver;
     private HelixManager helixManager;
+    private CallbackWorkflowStore cbws;
 
-    public void init(String clusterName, String workflowManagerName, String zkAddress) throws Exception {
+    public void init(String clusterName, String workflowManagerName, String zkAddress, CallbackWorkflowStore cbws)
+            throws Exception {
+        this.cbws = cbws;
         helixManager = HelixManagerFactory.getZKHelixManager(clusterName, workflowManagerName,
                 InstanceType.SPECTATOR, zkAddress);
         helixManager.connect();
@@ -83,7 +86,7 @@ public class WorkflowOperator {
         Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
 
         for (String startTaskId: startTaskIds) {
-            buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+            buildWorkflowRecursively(workflowBuilder, workflowName, startTaskId, taskMap);
         }
 
         WorkflowConfig.Builder config = new WorkflowConfig.Builder()
@@ -98,11 +101,19 @@ public class WorkflowOperator {
         return workflowName;
     }
 
-    private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String nonBlockingTaskId, int currentSection) {
+    private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String workflowName,
+                                         String nonBlockingTaskId, int currentSection) {
 
+        CallbackWorkflowEntity cwe = new CallbackWorkflowEntity();
+        cwe.setWorkflowId(workflowName);
+        cwe.setPrevSectionIndex(currentSection);
+        cwe.setTaskMap(taskMap);
+        cwe.setStartTaskId(nonBlockingTaskId);
+        this.cbws.saveWorkflowEntity(cwe);
     }
 
-    private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
+    private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String workflowName,
+                                          String nextTaskId, Map<String, AbstractTask> taskMap)
             throws Exception{
         AbstractTask currentTask = taskMap.get(nextTaskId);
 
@@ -142,7 +153,7 @@ public class WorkflowOperator {
                 if (outPort != null) {
                     workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
                     logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
-                    buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+                    buildWorkflowRecursively(workflowBuilder, workflowName, outPort.getNextTaskId(), taskMap);
                 }
             }
         } else if (nonBlockingTaskDef != null) {
@@ -170,7 +181,7 @@ public class WorkflowOperator {
 
             workflowBuilder.addJob(currentTask.getTaskId(), job);
 
-            continueNonBlockingRest(taskMap, nextTaskId, nbTask.getCurrentSection());
+            continueNonBlockingRest(taskMap, workflowName, nextTaskId, nbTask.getCurrentSection());
         } else {
             logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
             throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
@@ -204,7 +215,7 @@ public class WorkflowOperator {
                 TaskParam parm = classField.getAnnotation(TaskParam.class);
                 try {
                     if (parm != null) {
-                        Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+                        Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
                         if (propertyValue instanceof TaskParamType) {
                             result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
                         } else {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
new file mode 100644
index 0000000..53350c4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEvent;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEventDeserializer;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.WorkflowOperator;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class DataSyncWorkflowManager {
+
+    private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
+
+    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+    private String clusterName;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
+    private String workflowManagerName;
+
+    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+    private String zkAddress;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+    private String kafkaUrl;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.consumer.group}")
+    private String kafkaConsumerGroup;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.publish.topic}")
+    private String kafkaTopic;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.host}")
+    private String datasyncWmHost;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
+    private int datasyncWmPort;
+
+    @Autowired
+    private CallbackWorkflowStore callbackWorkflowStore;
+
+    private ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
+
+    private WorkflowOperator workflowOperator;
+
+    private Consumer<String, DataTransferEvent> createConsumer() {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DataTransferEventDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
+        // Create the consumer using props.
+        final Consumer<String, DataTransferEvent> consumer = new KafkaConsumer<>(props);
+        // Subscribe to the topic.
+        consumer.subscribe(Collections.singletonList(kafkaTopic));
+        return consumer;
+    }
+
+    private boolean processKakfaMessage(DataTransferEvent dte) {
+        logger.info("Processing DTE for task {}, workflow {} and status {}",
+                dte.getTaskId(), dte.getWorkflowId(), dte.getTransferStatus());
+        return true;
+    }
+
+    private void runKafkaConsumer() {
+
+        final Consumer<String, DataTransferEvent> mftEventConsumer = createConsumer();
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(() -> {
+            while (true) {
+                ConsumerRecords<String, DataTransferEvent> consumerRecords = mftEventConsumer.poll(Duration.ofMillis(1000));
+                if (!consumerRecords.isEmpty()) {
+
+                    CompletionService<Boolean> executorCompletionService = new ExecutorCompletionService<>(kafkaMessageProcessPool);
+
+                    List<Future<Boolean>> processingFutures = new ArrayList<>();
+
+                    for (TopicPartition partition : consumerRecords.partitions()) {
+                        List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
+                        for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
+                            processingFutures.add(executorCompletionService.submit(() -> {
+                                boolean success = processKakfaMessage(record.value());
+                                logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
+                                return success;
+                            }));
+
+                            mftEventConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
+                        }
+                    }
+
+                    for (Future<Boolean> f : processingFutures) {
+                        try {
+                            executorCompletionService.take().get();
+                        } catch (Exception e) {
+                            logger.error("Failed processing DTE", e);
+                        }
+                    }
+                    logger.info("All messages processed. Moving to next round");
+                }
+            }
+        });
+    }
+
+    public void init() throws Exception {
+        workflowOperator = new WorkflowOperator();
+        workflowOperator.init(clusterName, workflowManagerName, zkAddress, callbackWorkflowStore);
+        runKafkaConsumer();
+        logger.info("Successfully initialized DatasyncWorkflow Manager");
+    }
+
+    public void submitDataSyncWorkflow() throws Exception {
+        AsyncDataTransferTask dt1 = new AsyncDataTransferTask();
+        dt1.setSourceResourceId("");
+        dt1.setDestinationResourceId("");
+        dt1.setSourceCredToken("");
+        dt1.setDestinationCredToken("");
+        dt1.setCallbackUrl("localhost:33335");
+        dt1.setMftHost("localhost");
+        dt1.setMftPort(7004);
+        dt1.setMftClientId("");
+        dt1.setMftClientSecret("");
+        dt1.setUserId("dimuthu");
+        dt1.setCurrentSection(1);
+        dt1.setTaskId("dt-" + UUID.randomUUID().toString());
+        dt1.setMftCallbackStoreHost(datasyncWmHost);
+        dt1.setMftCallbackStorePort(datasyncWmPort);
+
+        Map<String, AbstractTask> taskMap = new HashMap<>();
+        taskMap.put(dt1.getTaskId(), dt1);
+        //taskMap.put(bt2.getTaskId(), bt2);
+        //taskMap.put(bt3.getTaskId(), bt3);
+        //taskMap.put(bt4.getTaskId(), bt4);
+        //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+        String[] startTaskIds = {dt1.getTaskId()};
+        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+        logger.info("Launched workflow {}", workflowId);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
similarity index 51%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
index 9033f1a..b858350 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
@@ -15,28 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class MFTCallbackEntity {
+    private String mftTransferId;
+    private int prevSectionIndex;
+    private String workflowId;
+    private String taskId;
 
-public abstract class BlockingTask extends AbstractTask {
+    public String getMftTransferId() {
+        return mftTransferId;
+    }
+
+    public void setMftTransferId(String mftTransferId) {
+        this.mftTransferId = mftTransferId;
+    }
 
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+    public int getPrevSectionIndex() {
+        return prevSectionIndex;
+    }
 
-    public BlockingTask() {
+    public void setPrevSectionIndex(int prevSectionIndex) {
+        this.prevSectionIndex = prevSectionIndex;
     }
 
-    @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
+    public String getWorkflowId() {
+        return workflowId;
     }
 
-    public abstract TaskResult runBlockingCode();
+    public void setWorkflowId(String workflowId) {
+        this.workflowId = workflowId;
+    }
 
-    @Override
-    public void onCancel() {
+    public String getTaskId() {
+        return taskId;
+    }
 
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
similarity index 59%
copy from data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
index 12526a6..d17a891 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
@@ -15,31 +15,26 @@
  * limitations under the License.
  */
 
-syntax = "proto3";
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
 
-option java_multiple_files = true;
-package org.apache.airavata.datalake.orchestrator.workflow.engine;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
-import "google/api/annotations.proto";
-import "Common.proto";
+// TODO: Fetch these from database
+public class MFTCallbackStore {
 
-message WorkflowMessage {
-    string message_id = 1;
-    string resource_id = 2;
-}
+    private static final Map<String, MFTCallbackEntity> store = new HashMap<>();
 
-message WorkflowInvocationRequest {
-    org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken authToken = 1;
-    WorkflowMessage message = 2;
-}
+    public Optional<MFTCallbackEntity> findMFTCallback(String transferId) {
+        if (!store.containsKey(transferId)) {
+            return Optional.empty();
+        } else {
+            return Optional.of(store.get(transferId));
+        }
+    }
 
-message WorkflowInvocationResponse {
-    bool status = 1;
+    public void saveMFTCallBack(MFTCallbackEntity callbackEntity) {
+        store.put(callbackEntity.getMftTransferId(), callbackEntity);
+    }
 }
-
-
-service WorkflowService {
-
-    rpc invokeWorkflow (WorkflowInvocationRequest) returns (WorkflowInvocationResponse);
-
-}
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
similarity index 65%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
index 9033f1a..68b6621 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
@@ -17,26 +17,22 @@
 
 package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
 
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
 import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public abstract class BlockingTask extends AbstractTask {
+public abstract class BiSectionNonBlockingTask extends NonBlockingTask{
 
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
-    public BlockingTask() {
+    @NonBlockingSection(sectionIndex = 1)
+    public final TaskResult section1() {
+        return beforeSection();
     }
 
-    @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
+    @NonBlockingSection(sectionIndex = 2)
+    public final TaskResult section2() {
+        return afterSection();
     }
 
-    public abstract TaskResult runBlockingCode();
-
-    @Override
-    public void onCancel() {
+    public abstract TaskResult beforeSection();
 
-    }
+    public abstract TaskResult afterSection();
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
new file mode 100644
index 0000000..91b0cc4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -0,0 +1,193 @@
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbacSaveRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.client.WorkflowEngineClient;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BiSectionNonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.api.service.TransferApiResponse;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@NonBlockingTaskDef(name = "AsyncDataTransferTask")
+public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(AsyncDataTransferTask.class);
+
+    @TaskParam(name = "SourceResourceId")
+    private final ThreadLocal<String> sourceResourceId = new ThreadLocal<>();
+
+    @TaskParam(name = "DestinationResourceId")
+    private final ThreadLocal<String> destinationResourceId = new ThreadLocal<>();
+
+    @TaskParam(name = "SourceCredToken")
+    private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
+
+    @TaskParam(name = "DestinationCredToken")
+    private final ThreadLocal<String> destinationCredToken = new ThreadLocal<>();
+
+    @TaskParam(name = "UserId")
+    private final ThreadLocal<String> userId = new ThreadLocal<>();
+
+    @TaskParam(name = "CallbackUrl")
+    private final ThreadLocal<String> callbackUrl = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTAPIHost")
+    private final ThreadLocal<String> mftHost = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTAPIPort")
+    private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTClientId")
+    private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTClientSecret")
+    private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTCallbackStoreHost")
+    private ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTCallbackStorePort")
+    private ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
+
+
+    public TaskResult beforeSection() {
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());
+        TransferApiResponse submitResponse = mftClient.submitTransfer(TransferApiRequest.newBuilder()
+                .setMftAuthorizationToken(AuthToken.newBuilder()
+                        .setDelegateAuth(
+                                DelegateAuth.newBuilder()
+                                        .setUserId(getUserId())
+                                        .setClientId(getMftClientId())
+                                        .setClientSecret(getMftClientSecret()).build())
+                        .build())
+                .setSourceResourceId(getSourceResourceId())
+                .setSourceToken(getSourceCredToken())
+                .setDestinationResourceId(getDestinationResourceId())
+                .setDestinationToken(getDestinationCredToken())
+                .build());
+
+        logger.info("Submitted transfer {} for task id {}", submitResponse.getTransferId(), getTaskId());
+
+        WorkflowServiceGrpc.WorkflowServiceBlockingStub wfServiceClient = WorkflowEngineClient.buildClient(
+                getMftCallbackStoreHost(), getMftCallbackStorePort());
+
+
+        wfServiceClient.saveMFTCallback(MFTCallbacSaveRequest.newBuilder()
+                .setMftTransferId(submitResponse.getTransferId())
+                .setTaskId(getTaskId())
+                .setWorkflowId(getCallbackContext().getJobConfig().getWorkflow())
+                .setPrevSectionIndex(1).build());
+
+        logger.info("Part 1 completed for task id {}", getTaskId());
+        return new TaskResult(TaskResult.Status.COMPLETED, "Section 1 completed");
+    }
+
+    public TaskResult afterSection() {
+        logger.info("Transfer completed successfully");
+        return new TaskResult(TaskResult.Status.COMPLETED, "Section 2 completed");
+    }
+
+    public String getSourceResourceId() {
+        return sourceResourceId.get();
+    }
+
+    public void setSourceResourceId(String sourceResourceId) {
+        this.sourceResourceId.set(sourceResourceId);
+    }
+
+    public String getDestinationResourceId() {
+        return destinationResourceId.get();
+    }
+
+    public void setDestinationResourceId(String destinationResourceId) {
+        this.destinationResourceId.set(destinationResourceId);
+    }
+
+    public String getSourceCredToken() {
+        return sourceCredToken.get();
+    }
+
+    public void setSourceCredToken(String sourceCredToken) {
+        this.sourceCredToken.set(sourceCredToken);
+    }
+
+    public String getDestinationCredToken() {
+        return destinationCredToken.get();
+    }
+
+    public void setDestinationCredToken(String destinationCredToken) {
+        this.destinationCredToken.set(destinationCredToken);
+    }
+
+    public String getUserId() {
+        return userId.get();
+    }
+
+    public void setUserId(String userId) {
+        this.userId.set(userId);
+    }
+
+    public String getCallbackUrl() {
+        return callbackUrl.get();
+    }
+
+    public void setCallbackUrl(String callbackUrl) {
+        this.callbackUrl.set(callbackUrl);
+    }
+
+    public String getMftHost() {
+        return mftHost.get();
+    }
+
+    public void setMftHost(String mftHost) {
+        this.mftHost.set(mftHost);
+    }
+
+    public Integer getMftPort() {
+        return mftPort.get();
+    }
+
+    public void setMftPort(Integer mftPort) {
+        this.mftPort.set(mftPort);
+    }
+
+    public String getMftClientId() {
+        return mftClientId.get();
+    }
+
+    public void setMftClientId(String mftClientId) {
+        this.mftClientId.set(mftClientId);
+    }
+
+    public String getMftClientSecret() {
+        return mftClientSecret.get();
+    }
+
+    public void setMftClientSecret(String mftClientSecret) {
+        this.mftClientSecret.set( mftClientSecret);
+    }
+
+    public String getMftCallbackStoreHost() {
+        return mftCallbackStoreHost.get();
+    }
+
+    public void setMftCallbackStoreHost(String mftCallbackStoreHost) {
+        this.mftCallbackStoreHost.set(mftCallbackStoreHost);
+    }
+
+    public Integer getMftCallbackStorePort() {
+        return mftCallbackStorePort.get();
+    }
+
+    public void setMftCallbackStorePort(Integer mftCallbackStorePort) {
+        this.mftCallbackStorePort.set(mftCallbackStorePort);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
similarity index 85%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
index 8b8d553..6f3a5ae 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
@@ -26,6 +26,13 @@ task.list.file=task-list.yaml
 
 datasync.wm.name=datasync_wf
 
+server.port=33335
+
+kafka.url=localhost:9092
+kafka.mft.publisher.name=mft-status-publisher
+kafka.mft.status.publish.topic=mft-status-topic
+kafka.mft.status.consumer.group=mft-even-group
+
 spring.datasource.url = jdbc:mysql://localhost:3306/data_orchestrator?useSSL=false&serverTimezone=UTC&useLegacyDatetimeCode=false&createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true
 spring.datasource.username = root
 
@@ -37,4 +44,7 @@ spring.jpa.properties.hibernate.enable_lazy_load_no_trans=true
 # Hibernate ddl auto (create, create-drop, validate, update)
 spring.jpa.hibernate.ddl-auto = update
 spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
-spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
\ No newline at end of file
+spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
+
+datasync.wm.grpc.host=localhost
+datasync.wm.grpc.port=6565
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/logback.xml b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/logback.xml
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/logback.xml
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/logback.xml
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
similarity index 60%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
index e9b17a4..456492f 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
@@ -2,4 +2,5 @@ tasks:
   blocking:
     - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
   nonBlocking:
-    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
\ No newline at end of file
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index 12526a6..64cecad 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -22,6 +22,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine;
 
 import "google/api/annotations.proto";
 import "Common.proto";
+import "google/protobuf/empty.proto";
 
 message WorkflowMessage {
     string message_id = 1;
@@ -37,9 +38,27 @@ message WorkflowInvocationResponse {
     bool status = 1;
 }
 
+message MFTCallbackGetRequest {
+    string mftTransferId = 1;
+}
+
+message MFTCallbackGetResponse {
+    string mftTransferId = 1;
+    int32 prevSectionIndex = 2;
+    string workflowId = 3;
+    string taskId = 4;
+}
+
+message MFTCallbacSaveRequest {
+    string mftTransferId = 1;
+    int32 prevSectionIndex = 2;
+    string workflowId = 3;
+    string taskId = 4;
+}
 
 service WorkflowService {
 
     rpc invokeWorkflow (WorkflowInvocationRequest) returns (WorkflowInvocationResponse);
-
+    rpc getMFTCallback(MFTCallbackGetRequest) returns (MFTCallbackGetResponse);
+    rpc saveMFTCallback(MFTCallbacSaveRequest) returns (google.protobuf.Empty);
 }
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 046a1cf..b6bd04f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
         <snakeyaml.version>1.15</snakeyaml.version>
 
         <yaml.version>1.15</yaml.version>
-        <spring.boot.version>2.2.1.RELEASE</spring.boot.version>
+        <spring.boot.version>2.5.0</spring.boot.version>
         <commons.beanutils.version>1.9.4</commons.beanutils.version>
     </properties>