You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:41 UTC

[airavata-data-lake] 34/42: Workflow engine initial implementation

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git

commit a49c3d4bb383682616d734b8a23c25c60fbf6260
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu May 20 15:42:06 2021 -0400

    Workflow engine initial implementation
---
 .gitignore                                         |   3 +
 data-orchestrator/pom.xml                          |   1 +
 data-orchestrator/workflow-engine/README.md        |  13 ++
 data-orchestrator/workflow-engine/pom.xml          |  72 ++++++
 .../workflow/engine/monitor/AsyncEventMonitor.java |  21 ++
 .../engine/services/controller/Controller.java     |  94 ++++++++
 .../engine/services/participant/Participant.java   | 251 +++++++++++++++++++++
 .../services/wm/DataSyncWorkflowManager.java       |  73 ++++++
 .../engine/services/wm/WorkflowOperator.java       | 182 +++++++++++++++
 .../workflow/engine/task/AbstractTask.java         | 158 +++++++++++++
 .../workflow/engine/task/BlockingTask.java         |  42 ++++
 .../workflow/engine/task/NonBlockingTask.java      |  36 +++
 .../orchestrator/workflow/engine/task/OutPort.java |  31 +++
 .../workflow/engine/task/TaskParamType.java        |  23 ++
 .../engine/task/annotation/BlockingTaskDef.java    |  29 +++
 .../engine/task/annotation/NonBlockingSection.java |  29 +++
 .../engine/task/annotation/NonBlockingTaskDef.java |  29 +++
 .../engine/task/annotation/TaskOutPort.java        |  29 +++
 .../workflow/engine/task/annotation/TaskParam.java |  31 +++
 .../engine/task/impl/ExampleBlockingTask.java      |  36 +++
 .../engine/task/impl/ExampleNonBlockingTask.java   |  28 +++
 .../src/main/resources/application.properties      |  27 +++
 .../workflow-engine/src/main/resources/logback.xml |  50 ++++
 .../src/main/resources/task-list.yaml              |   5 +
 pom.xml                                            |   2 +
 25 files changed, 1295 insertions(+)

diff --git a/.gitignore b/.gitignore
index ab08b32..3583487 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,6 @@ data-orchestrator/data-orchestrator-api/target
 /metadata-service/db-service/client/client.iml
 /metadata-service/db-service/server/server.iml
 /metadata-service/db-service/stub/stub.iml
+/metadata-service/data-builders/data-builders.iml
+/metadata-service/db-service/db-service.iml
+/data-orchestrator/workflow-engine/workflow-engine.iml
\ No newline at end of file
diff --git a/data-orchestrator/pom.xml b/data-orchestrator/pom.xml
index 2d3ee54..ee7f550 100644
--- a/data-orchestrator/pom.xml
+++ b/data-orchestrator/pom.xml
@@ -35,6 +35,7 @@
     <modules>
         <module>data-orchestrator-api</module>
         <module>data-orchestrator-core</module>
+        <module>workflow-engine</module>
     </modules>
 
 
diff --git a/data-orchestrator/workflow-engine/README.md b/data-orchestrator/workflow-engine/README.md
new file mode 100644
index 0000000..0efb458
--- /dev/null
+++ b/data-orchestrator/workflow-engine/README.md
@@ -0,0 +1,13 @@
+### Service Execution Order
+
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.DataSyncWorkflowManager
+
+### Configure the participant with new tasks
+
+* Extend the task class by BlockingTask or NonBlockingTask class
+* Implement methods
+* Annotate the class with @BlockingTaskDef or @NonBlockingTaskDef annotations. See ExampleBlockingTask and ExampleNonBlockingTask
+* Register task in src/main/resources/task-list.yaml
+
diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
new file mode 100644
index 0000000..ef96c4f
--- /dev/null
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>data-orchestrator</artifactId>
+        <groupId>org.apache.airavata.data.lake</groupId>
+        <version>0.01-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>workflow-engine</artifactId>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>1.0.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+            <version>${spring.boot.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${log4j.over.slf4j}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${yaml.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
new file mode 100644
index 0000000..4afa566
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
+
+public class AsyncEventMonitor {
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
new file mode 100644
index 0000000..83e6af5
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.util.concurrent.CountDownLatch;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller")
+public class Controller implements CommandLineRunner {
+
+    private final static Logger logger = LoggerFactory.getLogger(Controller.class);
+
+    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+    private String clusterName;
+
+    @org.springframework.beans.factory.annotation.Value("${controller.name}")
+    private String controllerName;
+
+    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+    private String zkAddress;
+
+    private org.apache.helix.HelixManager zkHelixManager;
+
+    private CountDownLatch startLatch = new CountDownLatch(1);
+    private CountDownLatch stopLatch = new CountDownLatch(1);
+
+    @Override
+    public void run(String... args) throws Exception {
+        logger.info("Starting Cluster Controller ......");
+
+        try {
+            ZkClient zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+            // Creates the zk cluster if not available
+            if (!zkHelixAdmin.getClusters().contains(clusterName)) {
+                zkHelixAdmin.addCluster(clusterName, true);
+            }
+
+            zkHelixAdmin.close();
+            zkClient.close();
+
+            logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName);
+            logger.info("Zookeeper connection string " + zkAddress);
+
+            zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+                    controllerName, HelixControllerMain.STANDALONE);
+            startLatch.countDown();
+            stopLatch.await();
+        } catch (Exception ex) {
+            logger.error("Error in running the Controller: {}", controllerName, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Controller: {}, has disconnected from cluster: {}", controllerName, clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+
+    public static void main(String args[]) throws Exception {
+        SpringApplication.run(Controller.class);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
new file mode 100644
index 0000000..a4b9bdd
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.*;
+import java.util.*;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant")
+public class Participant implements CommandLineRunner {
+
+    private final static Logger logger = LoggerFactory.getLogger(Participant.class);
+
+    private int shutdownGracePeriod = 30000;
+    private int shutdownGraceRetries = 2;
+
+    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+    private String zkAddress;
+
+    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+    private String clusterName;
+
+    @org.springframework.beans.factory.annotation.Value("${participant.name}")
+    private String participantName;
+
+    @org.springframework.beans.factory.annotation.Value("${task.list.file}")
+    private String taskListFile;
+
+    private ZKHelixManager zkHelixManager;
+
+    private List<String> blockingTaskClasses = new ArrayList<>();
+    private List<String> nonBlockingTaskClasses = new ArrayList<>();
+
+    private final List<String> runningTasks = Collections.synchronizedList(new ArrayList<String>());
+
+    @Override
+    public void run(String... args) throws Exception {
+        logger.info("Staring Participant .....");
+
+        loadTasks();
+
+        ZkClient zkClient = null;
+        try {
+            zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+            List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+            if (!nodesInCluster.contains(participantName)) {
+                InstanceConfig instanceConfig = new InstanceConfig(participantName);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setInstanceEnabled(true);
+                zkHelixAdmin.addInstance(clusterName, instanceConfig);
+                logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName);
+
+            } else {
+                zkHelixAdmin.enableInstance(clusterName, participantName, true);
+                logger.debug("Participant: " + participantName + " has been re-enabled at the cluster: " + clusterName);
+            }
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread(() -> {
+                        logger.debug("Participant: " + participantName + " shutdown hook called");
+                        try {
+                            zkHelixAdmin.enableInstance(clusterName, participantName, false);
+                        } catch (Exception e) {
+                            logger.warn("Participant: " + participantName + " was not disabled normally", e);
+                        }
+                        disconnect();
+                    })
+            );
+
+            connect();
+
+        } catch (Exception ex) {
+            logger.error("Error running Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+    }
+
+    private void loadTasks() throws Exception {
+
+        try {
+            Yaml yaml = new Yaml();
+            File listFile = new File(taskListFile);
+
+            InputStream stream;
+            if (listFile.exists()) {
+                logger.info("Loading task list file {} from absolute path", taskListFile);
+                stream = new FileInputStream(taskListFile);
+            } else {
+                logger.info("Loading task list file {} from class path", taskListFile);
+                stream = Participant.class.getClassLoader().getResourceAsStream(taskListFile);
+            }
+
+            Object load = yaml.load(stream);
+
+            if (load == null) {
+                throw new Exception("Did not load the configuration from file " + taskListFile);
+            }
+
+            if (load instanceof Map) {
+                Map rootMap = (Map) load;
+                if (rootMap.containsKey("tasks")) {
+                    Object tasksObj = rootMap.get("tasks");
+                    if (tasksObj instanceof Map) {
+                        Map tasksMap = (Map) tasksObj;
+                        if (tasksMap.containsKey("blocking")) {
+                            Object blockingTaskObj = tasksMap.get("blocking");
+                            if (blockingTaskObj instanceof List) {
+                                blockingTaskClasses = (List<String>) blockingTaskObj;
+                                blockingTaskClasses.forEach(taskClz -> {
+                                    logger.info("Loading blocking task " + taskClz);
+                                });
+                            }
+                        }
+
+                        if (tasksMap.containsKey("nonBlocking")) {
+                            Object nonBlockingTaskObj = tasksMap.get("nonBlocking");
+                            if (nonBlockingTaskObj instanceof List) {
+                                nonBlockingTaskClasses = (List<String>) nonBlockingTaskObj;
+                                nonBlockingTaskClasses.forEach(taskClz -> {
+                                    logger.info("Loading non blocking task " + taskClz);
+                                });
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (FileNotFoundException e) {
+            logger.error("Failed to load task list from file {}", taskListFile, e);
+            throw e;
+        }
+    }
+
+    private Map<String, TaskFactory> getTaskFactory() throws Exception {
+
+        Map<String, TaskFactory> taskMap = new HashMap<>();
+
+        for (String className : blockingTaskClasses) {
+            try {
+                logger.info("Loading blocking task {}", className);
+                Class<?> taskClz = Class.forName(className);
+                Object taskObj = taskClz.getConstructor().newInstance();
+                BlockingTask blockingTask = (BlockingTask) taskObj;
+                TaskFactory taskFactory = context -> {
+                    blockingTask.setCallbackContext(context);
+                    return blockingTask;
+                };
+                BlockingTaskDef btDef = blockingTask.getClass().getAnnotation(BlockingTaskDef.class);
+                taskMap.put(btDef.name(), taskFactory);
+
+            } catch (ClassNotFoundException e) {
+                logger.error("Couldn't find a class with name {}", className);
+                throw e;
+            }
+        }
+        return taskMap;
+    }
+
+    private void connect() {
+        try {
+            zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+            // register online-offline model
+            StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+            OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+            machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+            // register task model
+            machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+
+            logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+            zkHelixManager.connect();
+            logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+            Thread.currentThread().join();
+        } catch (InterruptedException ex) {
+            logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+        } catch (Exception ex) {
+            logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    private void disconnect() {
+        logger.info("Shutting down participant. Currently available tasks " + runningTasks.size());
+        if (zkHelixManager != null) {
+            if (runningTasks.size() > 0) {
+                for (int i = 0; i <= shutdownGraceRetries; i++) {
+                    logger.info("Shutting down gracefully [RETRY " + i + "]");
+                    try {
+                        Thread.sleep(shutdownGracePeriod);
+                    } catch (InterruptedException e) {
+                        logger.warn("Waiting for running tasks failed [RETRY " + i + "]", e);
+                    }
+                    if (runningTasks.size() == 0) {
+                        break;
+                    }
+                }
+            }
+            logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+
+    public static void main(String args[]) throws Exception {
+        SpringApplication.run(Participant.class);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
new file mode 100644
index 0000000..1e987a6
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm")
+public class DataSyncWorkflowManager implements CommandLineRunner {
+
+    private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
+
+    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+    private String clusterName;
+
+    @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
+    private String workflowManagerName;
+
+    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+    private String zkAddress;
+
+    @Override
+    public void run(String... args) throws Exception {
+        WorkflowOperator workflowOperator = new WorkflowOperator();
+        workflowOperator.init(clusterName, workflowManagerName, zkAddress);
+
+        ExampleBlockingTask bt1 = new ExampleBlockingTask();
+        bt1.setTaskId("bt1-" + UUID.randomUUID());
+
+        ExampleBlockingTask bt2 = new ExampleBlockingTask();
+        bt2.setTaskId("bt2-" + UUID.randomUUID());
+
+        // Setting dependency
+        bt1.setOutPort(new OutPort().setNextTaskId(bt2.getTaskId()));
+
+        Map<String, AbstractTask> taskMap = new HashMap<>();
+        taskMap.put(bt1.getTaskId(), bt1);
+        taskMap.put(bt2.getTaskId(), bt2);
+        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, bt1.getTaskId());
+        logger.info("Launched workflow {}", workflowId);
+    }
+
+    public static void main(String args[]) throws Exception {
+        SpringApplication.run(DataSyncWorkflowManager.class);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
new file mode 100644
index 0000000..8590eb5
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+public class WorkflowOperator {
+
+    private static final long WORKFLOW_EXPIRY_TIME = 1 * 1000;
+    private static final long TASK_EXPIRY_TIME = 24 * 60 * 60 * 1000;
+
+    private TaskDriver taskDriver;
+    private HelixManager helixManager;
+
+    public void init(String clusterName, String workflowManagerName, String zkAddress) throws Exception {
+        helixManager = HelixManagerFactory.getZKHelixManager(clusterName, workflowManagerName,
+                InstanceType.SPECTATOR, zkAddress);
+        helixManager.connect();
+
+        Runtime.getRuntime().addShutdownHook(
+                new Thread() {
+                    @Override
+                    public void run() {
+                        if (helixManager != null && helixManager.isConnected()) {
+                            helixManager.disconnect();
+                        }
+                    }
+                }
+        );
+
+        taskDriver = new TaskDriver(helixManager);
+    }
+
+    public void destroy() {
+        if (helixManager != null) {
+            helixManager.disconnect();
+        }
+    }
+
+    public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String startTaskId) throws Exception {
+
+        if (taskDriver == null) {
+            throw new Exception("Workflow operator needs to be initialized");
+        }
+
+        String workflowName = UUID.randomUUID().toString();
+        Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
+        buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+
+        WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+        workflowBuilder.setWorkflowConfig(config.build());
+        workflowBuilder.setExpiry(WORKFLOW_EXPIRY_TIME);
+        Workflow workflow = workflowBuilder.build();
+
+        taskDriver.start(workflow);
+        return workflowName;
+    }
+
+    private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
+            throws Exception{
+        AbstractTask currentTask = taskMap.get(nextTaskId);
+        String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name();
+        TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+                .setTaskId(currentTask.getTaskId())
+                .setCommand(taskType);
+
+        Map<String, String> paramMap = serializeTaskData(currentTask);
+        paramMap.forEach(taskBuilder::addConfig);
+
+        List<TaskConfig> taskBuilds = new ArrayList<>();
+        taskBuilds.add(taskBuilder.build());
+
+        JobConfig.Builder job = new JobConfig.Builder()
+                .addTaskConfigs(taskBuilds)
+                .setFailureThreshold(0)
+                .setExpiry(WORKFLOW_EXPIRY_TIME)
+                .setTimeoutPerTask(TASK_EXPIRY_TIME)
+                .setNumConcurrentTasksPerInstance(20)
+                .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+        workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+        List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+
+        for (OutPort outPort : outPorts) {
+            if (outPort != null) {
+                workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+                buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+            }
+        }
+    }
+
+    public String getWorkflowStatus(String workflowName) {
+        WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowName);
+        TaskState workflowState = workflowContext.getWorkflowState();
+        return workflowState.name();
+    }
+
+    public void stopWorkflow(String workflowName) {
+        taskDriver.stop(workflowName);
+    }
+
+    public void resumeWorkflow(String workflowName) {
+        taskDriver.resume(workflowName);
+    }
+
+    public void deleteWorkflow(String workflowName) {
+        taskDriver.delete(workflowName);
+    }
+
+    private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+
+        Map<String, String> result = new HashMap<>();
+        for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field classField : fields) {
+                TaskParam parm = classField.getAnnotation(TaskParam.class);
+                if (parm != null) {
+                    classField.setAccessible(true);
+                    if (classField.get(data) instanceof TaskParamType) {
+                        result.put(parm.name(), TaskParamType.class.cast(classField.get(data)).serialize());
+                    } else {
+                        result.put(parm.name(), classField.get(data).toString());
+                    }
+                }
+
+                TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+                if (outPort != null) {
+                    classField.setAccessible(true);
+                    if (classField.get(data) != null) {
+                        result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    private <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
+
+        List<OutPort> outPorts = new ArrayList<>();
+        for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field field : fields) {
+                TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+                if (outPortAnnotation != null) {
+                    field.setAccessible(true);
+                    OutPort outPort = (OutPort) field.get(taskObj);
+                    outPorts.add(outPort);
+                }
+            }
+        }
+        return outPorts;
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
new file mode 100644
index 0000000..c9ceee9
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+    private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
+
+    private TaskCallbackContext callbackContext;
+
+    @TaskOutPort(name = "nextTask")
+    private OutPort outPort;
+
+    @TaskParam(name = "taskId")
+    private String taskId;
+
+    @TaskParam(name = "retryCount")
+    private int retryCount = 3;
+
+    public AbstractTask() {
+
+    }
+
+    @Override
+    public TaskResult run() {
+        try {
+            String helixTaskId = this.callbackContext.getTaskConfig().getId();
+            logger.info("Running task {}", helixTaskId);
+            deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+        } catch (Exception e) {
+            logger.error("Failed at deserializing task data", e);
+            return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
+        }
+        return onRun();
+    }
+
+    @Override
+    public void cancel() {
+        onCancel();
+    }
+
+    public abstract TaskResult onRun();
+
+    public abstract void onCancel();
+
+    public OutPort getOutPort() {
+        return outPort;
+    }
+
+    public void setOutPort(OutPort outPort) {
+        this.outPort = outPort;
+    }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    public TaskCallbackContext getCallbackContext() {
+        return callbackContext;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    public void setCallbackContext(TaskCallbackContext callbackContext) {
+        this.callbackContext = callbackContext;
+    }
+
+    private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
+
+        List<Field> allFields = new ArrayList<>();
+        Class genericClass = instance.getClass();
+
+        while (AbstractTask.class.isAssignableFrom(genericClass)) {
+            Field[] declaredFields = genericClass.getDeclaredFields();
+            for (Field declaredField : declaredFields) {
+                allFields.add(declaredField);
+            }
+            genericClass = genericClass.getSuperclass();
+        }
+
+        for (Field classField : allFields) {
+            TaskParam param = classField.getAnnotation(TaskParam.class);
+            if (param != null) {
+                if (params.containsKey(param.name())) {
+                    classField.setAccessible(true);
+                    if (classField.getType().isAssignableFrom(String.class)) {
+                        classField.set(instance, params.get(param.name()));
+                    } else if (classField.getType().isAssignableFrom(Integer.class) ||
+                            classField.getType().isAssignableFrom(Integer.TYPE)) {
+                        classField.set(instance, Integer.parseInt(params.get(param.name())));
+                    } else if (classField.getType().isAssignableFrom(Long.class) ||
+                            classField.getType().isAssignableFrom(Long.TYPE)) {
+                        classField.set(instance, Long.parseLong(params.get(param.name())));
+                    } else if (classField.getType().isAssignableFrom(Boolean.class) ||
+                            classField.getType().isAssignableFrom(Boolean.TYPE)) {
+                        classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
+                    } else if (TaskParamType.class.isAssignableFrom(classField.getType())) {
+                        Class<?> clazz = classField.getType();
+                        Constructor<?> ctor = clazz.getConstructor();
+                        Object obj = ctor.newInstance();
+                        ((TaskParamType)obj).deserialize(params.get(param.name()));
+                        classField.set(instance, obj);
+                    }
+                }
+            }
+        }
+
+        for (Field classField : allFields) {
+            TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+            if (outPort != null) {
+                classField.setAccessible(true);
+                OutPort op = new OutPort();
+                op.setNextTaskId(params.get(outPort.name()));
+            }
+        }
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
new file mode 100644
index 0000000..9033f1a
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BlockingTask extends AbstractTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+
+    public BlockingTask() {
+    }
+
+    @Override
+    public TaskResult onRun() {
+        return runBlockingCode();
+    }
+
+    public abstract TaskResult runBlockingCode();
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
new file mode 100644
index 0000000..9d2532c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.helix.task.TaskResult;
+
+public class NonBlockingTask extends AbstractTask {
+
+    public NonBlockingTask() {
+    }
+
+    @Override
+    public TaskResult onRun() {
+        return null;
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
new file mode 100644
index 0000000..2b4bb09
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+public class OutPort {
+    private String nextTaskId;
+
+    public String getNextTaskId() {
+        return nextTaskId;
+    }
+
+    public OutPort setNextTaskId(String nextTaskId) {
+        this.nextTaskId = nextTaskId;
+        return this;
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
new file mode 100644
index 0000000..08f0fb0
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+public interface TaskParamType {
+    public String serialize();
+    public void deserialize(String content);
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
new file mode 100644
index 0000000..06735bc
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface BlockingTaskDef {
+    public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
new file mode 100644
index 0000000..8047a9b
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface NonBlockingSection {
+    public int order();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
new file mode 100644
index 0000000..d38dc30
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface NonBlockingTaskDef {
+    public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
new file mode 100644
index 0000000..ce8e9ae
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskOutPort {
+    public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
new file mode 100644
index 0000000..fe94273
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskParam {
+    public String name();
+    public String defaultValue() default "";
+    public boolean mandatory() default false;
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
new file mode 100644
index 0000000..93ec010
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@BlockingTaskDef(name = "ExampleBlockingTask")
+public class ExampleBlockingTask extends BlockingTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(ExampleBlockingTask.class);
+
+    @Override
+    public TaskResult runBlockingCode() {
+        logger.info("Running example blocking task {}", getTaskId());
+        return new TaskResult(TaskResult.Status.COMPLETED, "Success");
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
new file mode 100644
index 0000000..527d0a2
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+
+@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
+public class ExampleNonBlockingTask extends NonBlockingTask {
+
+    public ExampleNonBlockingTask() {
+    }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/resources/application.properties b/data-orchestrator/workflow-engine/src/main/resources/application.properties
new file mode 100644
index 0000000..ea188ec
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/application.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+#
+
+cluster.name=datalake
+controller.name=datalake_controller
+zookeeper.connection=localhost:2181
+
+participant.name=datalake_participant
+task.list.file=task-list.yaml
+
+datasync.wm.name=datasync_wf
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/resources/logback.xml b/data-orchestrator/workflow-engine/src/main/resources/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <File>../logs/airavata.log</File>
+        <Append>true</Append>
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="ch.qos.logback" level="WARN"/>
+    <logger name="org.apache.helix" level="WARN"/>
+    <logger name="org.apache.zookeeper" level="ERROR"/>
+    <logger name="org.apache.helix" level="ERROR"/>
+    <logger name="org.apache.airavata" level="INFO"/>
+    <logger name="org.hibernate" level="ERROR"/>
+    <logger name="net.schmizz.sshj" level="WARN"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="LOGFILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml
new file mode 100644
index 0000000..e9b17a4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml
@@ -0,0 +1,5 @@
+tasks:
+  blocking:
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
+  nonBlocking:
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 69205f7..9489f35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,8 @@
         <neo4j.version>3.4.6</neo4j.version>
         <io.grpc.version>1.25.0</io.grpc.version>
         <spring-security.version>5.3.4.RELEASE</spring-security.version>
+        <yaml.version>1.15</yaml.version>
+        <spring.boot.version>2.2.1.RELEASE</spring.boot.version>
 
     </properties>