You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:13:48 UTC

[airavata-sandbox] 04/19: Initial implementation of task execution engine on top of helix

This is an automated email from the ASF dual-hosted git repository.

smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git

commit 23b761ec3813f6db12ad2d851924d40021017fd9
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Sun Nov 19 01:19:04 2017 +0530

    Initial implementation of task execution engine on top of helix
---
 .../k8s/compute/impl/SSHComputeOperations.java     | 152 ++++++++++++-------
 airavata-kubernetes/modules/helix-tasks/pom.xml    |  89 +++++++++++
 .../org/apache/airavata/helix/HelixCluster.java    |  48 ++++++
 .../org/apache/airavata/helix/HelixController.java |  85 +++++++++++
 .../org/apache/airavata/helix/HelixManager.java    |  14 ++
 .../apache/airavata/helix/HelixParticipant.java    | 143 ++++++++++++++++++
 .../org/apache/airavata/helix/WorkflowManager.java |  86 +++++++++++
 .../apache/airavata/helix/tasks/AbstractTask.java  | 139 ++++++++++++++++++
 .../airavata/helix/tasks/DataCollectingTask.java   |  32 ++++
 .../airavata/helix/tasks/DataPushingTask.java      |  39 +++++
 .../airavata/helix/tasks/command/CommandTask.java  | 137 +++++++++++++++++
 .../airavata/helix/tasks/command/Participant.java  |  55 +++++++
 .../airavata/helix/tasks/datain/DataInputTask.java | 123 ++++++++++++++++
 .../airavata/helix/tasks/datain/Participant.java   |  56 +++++++
 .../helix/tasks/dataout/DataOutputTask.java        | 128 ++++++++++++++++
 .../airavata/helix/tasks/dataout/Participant.java  |  54 +++++++
 .../src/main/resources/log4j.properties            |   9 ++
 .../api/server/controller/DataStoreController.java |   9 +-
 .../k8s/api/server/model/data/DataStoreModel.java  |  11 +-
 .../k8s/api/server/service/WorkflowService.java    |   6 +-
 .../api/server/service/data/DataStoreService.java  |   7 +-
 .../api/server/service/util/ToResourceUtil.java    |   6 +-
 .../modules/microservices/task-scheduler/pom.xml   |  18 +++
 .../k8s/gfac/core/HelixWorkflowManager.java        | 162 +++++++++++++++++++++
 .../airavata/k8s/gfac/messaging/KafkaReceiver.java |  12 +-
 .../k8s/gfac/service/HelixWorkflowService.java     |  17 +++
 .../airavata/k8s/gfac/service/WorkerService.java   |  31 +++-
 .../src/main/resources/log4j.properties            |   9 ++
 airavata-kubernetes/pom.xml                        |   1 +
 29 files changed, 1593 insertions(+), 85 deletions(-)

diff --git a/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java b/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
index 140c5b6..fb77981 100644
--- a/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
+++ b/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -51,46 +50,11 @@ public class SSHComputeOperations implements ComputeOperations {
         this.port = port;
     }
 
-    public ExecutionResult executeCommand(String command) throws JSchException, IOException {
-        JSch jsch = new JSch();
-        Session session = jsch.getSession(userName, this.computeHost, port);
-        session.setConfig("StrictHostKeyChecking", "no");
+    public ExecutionResult executeCommand(String command) throws Exception {
+        Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
 
-        session.setUserInfo(new UserInfo() {
-            @Override
-            public String getPassphrase() {
-                return password;
-            }
-
-            @Override
-            public String getPassword() {
-                return password;
-            }
-
-            @Override
-            public boolean promptPassword(String s) {
-                return true;
-            }
-
-            @Override
-            public boolean promptPassphrase(String s) {
-                return false;
-            }
-
-            @Override
-            public boolean promptYesNo(String s) {
-                return false;
-            }
-
-            @Override
-            public void showMessage(String s) {
-
-            }
-        });
-
-        session.connect();
-        Channel channel=session.openChannel("exec");
-        ((ChannelExec)channel).setCommand(command);
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
 
         ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
         channel.setOutputStream(sysOut);
@@ -104,9 +68,9 @@ public class SSHComputeOperations implements ComputeOperations {
         ExecutionResult result = new ExecutionResult();
         byte[] tmp = new byte[1024];
         while (true) {
-            while (in.available()>0) {
+            while (in.available() > 0) {
                 int i = in.read(tmp, 0, 1024);
-                if (i<0) break;
+                if (i < 0) break;
                 System.out.print(new String(tmp, 0, i));
             }
             if (channel.isClosed()) {
@@ -117,7 +81,8 @@ public class SSHComputeOperations implements ComputeOperations {
             }
             try {
                 Thread.sleep(1000);
-            } catch(Exception e){}
+            } catch (Exception e) {
+            }
         }
 
         channel.disconnect();
@@ -128,16 +93,27 @@ public class SSHComputeOperations implements ComputeOperations {
         return result;
     }
 
-    public void transferDataIn(String source, String target, String protocol) {
-
+    public void transferDataIn(String source, String target, String protocol) throws Exception {
+        Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
+        copyLocalToRemote(session, source, target);
     }
 
     public void transferDataOut(String source, String target, String protocol) throws Exception {
+        Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
+        copyRemoteToLocal(session, source, target);
+    }
+
+    private static Session getConnectedSession(String userName, String password, String computeHost, int port) throws Exception {
         JSch jsch = new JSch();
-        Session session = jsch.getSession(userName, this.computeHost, port);
+        Session session = jsch.getSession(userName, computeHost, port);
         session.setConfig("StrictHostKeyChecking", "no");
+        session.setUserInfo(getUserInfo(password));
+        session.connect();
+        return session;
+    }
 
-        session.setUserInfo(new UserInfo() {
+    private static UserInfo getUserInfo(String password) {
+        return new UserInfo() {
             @Override
             public String getPassphrase() {
                 return password;
@@ -167,11 +143,79 @@ public class SSHComputeOperations implements ComputeOperations {
             public void showMessage(String s) {
 
             }
-        });
+        };
+    }
 
-        session.connect();
+    private static void copyLocalToRemote(Session session, String source, String target) throws Exception {
 
-        copyRemoteToLocal(session, source, target);
+        FileInputStream fis = null;
+        boolean ptimestamp = true;
+
+        // exec 'scp -t rfile' remotely
+        String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + target;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+
+        // get I/O streams for remote scp
+        OutputStream out = channel.getOutputStream();
+        InputStream in = channel.getInputStream();
+
+        channel.connect();
+
+        if (checkAck(in) != 0) {
+            return;
+        }
+
+        File _lfile = new File(source);
+
+        if (ptimestamp) {
+            command = "T " + (_lfile.lastModified() / 1000) + " 0";
+            // The access time should be sent here,
+            // but it is not accessible with JavaAPI ;-<
+            command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+            out.write(command.getBytes());
+            out.flush();
+            if (checkAck(in) != 0) {
+                return;
+            }
+        }
+
+        // send "C0644 filesize filename", where filename should not include '/'
+        long filesize = _lfile.length();
+        command = "C0644 " + filesize + " ";
+        if (source.lastIndexOf('/') > 0) {
+            command += source.substring(source.lastIndexOf('/') + 1);
+        } else {
+            command += source;
+        }
+        command += "\n";
+        out.write(command.getBytes());
+        out.flush();
+        if (checkAck(in) != 0) {
+            return;
+        }
+
+        // send a content of lfile
+        fis = new FileInputStream(source);
+        byte[] buf = new byte[1024];
+        while (true) {
+            int len = fis.read(buf, 0, buf.length);
+            if (len <= 0) break;
+            out.write(buf, 0, len); //out.flush();
+        }
+        fis.close();
+        fis = null;
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+        if (checkAck(in) != 0) {
+            return;
+        }
+        out.close();
+
+        channel.disconnect();
+        session.disconnect();
     }
 
     private static void copyRemoteToLocal(Session session, String source, String target) throws JSchException, IOException {
@@ -246,7 +290,7 @@ public class SSHComputeOperations implements ComputeOperations {
             }
 
             if (checkAck(in) != 0) {
-                System.exit(0);
+                return;
             }
 
             // send '\0'
diff --git a/airavata-kubernetes/modules/helix-tasks/pom.xml b/airavata-kubernetes/modules/helix-tasks/pom.xml
new file mode 100644
index 0000000..568c922
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-kubernetes</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>helix-tasks</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
+            <version>3.0.2.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>api-resource</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>compute-resource-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>0.6.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.53</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
new file mode 100644
index 0000000..2b96328
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.helix;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixCluster {
+
+    private static final Logger logger = LogManager.getLogger(HelixCluster.class);
+
+    private String zkAddress;
+    private String clusterName;
+    private int numPartitions;
+
+    private ZkClient zkClient;
+    private ZKHelixAdmin zkHelixAdmin;
+
+    public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
+        this.zkAddress = zkAddress;
+        this.clusterName = clusterName;
+        this.numPartitions = numPartitions;
+
+        zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        zkHelixAdmin = new ZKHelixAdmin(zkClient);
+    }
+
+    public void setup() {
+        zkHelixAdmin.addCluster(clusterName, true);
+        zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
+        logger.info("Cluster: " +  clusterName + ", has been added.");
+    }
+
+    public void disconnect() {
+        if (zkClient != null) {
+            zkClient.close();
+        }
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
new file mode 100644
index 0000000..8237d43
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
@@ -0,0 +1,85 @@
+package org.apache.airavata.helix;
+
+import org.apache.helix.*;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixController implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(HelixController.class);
+
+    private String clusterName;
+    private String controllerName;
+    private String zkAddress;
+    private org.apache.helix.HelixManager zkHelixManager;
+
+    private CountDownLatch startLatch = new CountDownLatch(1);
+    private CountDownLatch stopLatch = new CountDownLatch(1);
+
+    public HelixController(String zkAddress, String clusterName, String controllerName) {
+        this.clusterName = clusterName;
+        this.controllerName = controllerName;
+        this.zkAddress = zkAddress;
+    }
+
+    public void run() {
+        try {
+            zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+                    controllerName, HelixControllerMain.STANDALONE);
+            startLatch.countDown();
+            stopLatch.await();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+
+    }
+
+    public void start() {
+        new Thread(this).start();
+        try {
+            startLatch.await();
+            logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            disconnect();
+                        }
+                    }
+            );
+
+        } catch (InterruptedException ex) {
+            logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+        }
+
+    }
+
+    public void stop() {
+        stopLatch.countDown();
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+
+    public static void main(String args[]) {
+        HelixController helixController = new HelixController("localhost:2199", "AiravataDemoCluster", "AiravataController");
+        helixController.start();
+    }
+
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
new file mode 100644
index 0000000..e09f307
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
@@ -0,0 +1,14 @@
+package org.apache.airavata.helix;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixManager {
+    public static void main(String args[]) {
+        HelixCluster helixCluster = new HelixCluster("localhost:2199", "AiravataDemoCluster", 1);
+        helixCluster.setup();
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
new file mode 100644
index 0000000..cbbc300
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
@@ -0,0 +1,143 @@
+package org.apache.airavata.helix;
+
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class HelixParticipant implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(HelixParticipant.class);
+
+    private String zkAddress;
+    private String clusterName;
+    private String participantName;
+    private ZKHelixManager zkHelixManager;
+    private String taskTypeName;
+    private String apiServerUrl;
+    private RestTemplate restTemplate;
+
+    public HelixParticipant(String zkAddress,
+                            String clusterName,
+                            String participantName,
+                            String taskTypeName,
+                            String apiServerUrl) {
+
+        logger.debug("Initializing Participant Node");
+        this.zkAddress = zkAddress;
+        this.clusterName = clusterName;
+        this.participantName = participantName;
+        this.taskTypeName = taskTypeName;
+        this.apiServerUrl = apiServerUrl;
+        this.restTemplate = new RestTemplate();
+    }
+
+    public abstract Map<String, TaskFactory> getTaskFactory();
+
+    public abstract TaskTypeResource getTaskType();
+
+    public void run() {
+        ZkClient zkClient = null;
+        try {
+            zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+            List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+            if (!nodesInCluster.contains(participantName)) {
+                InstanceConfig instanceConfig = new InstanceConfig(participantName);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setInstanceEnabled(true);
+                instanceConfig.addTag(taskTypeName);
+                zkHelixAdmin.addInstance(clusterName, instanceConfig);
+                logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+            } else {
+                zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName);
+            }
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            logger.debug("Participant: " + participantName + ", shutdown hook called.");
+                            disconnect();
+                        }
+                    }
+            );
+
+            // connect the participant manager
+            register();
+            connect();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+    }
+
+    private void register() {
+        this.restTemplate.postForObject("http://" + apiServerUrl + "/taskType", getTaskType(), Long.class);
+    }
+
+    private void connect() {
+        try {
+            zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+            // register online-offline model
+            StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+            OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+            machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+            // register task model
+            machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+            logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+            zkHelixManager.connect();
+            logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+            Thread.currentThread().join();
+        } catch (InterruptedException ex) {
+            logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+        }
+        catch (Exception ex) {
+            logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
new file mode 100644
index 0000000..6866af3
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
@@ -0,0 +1,86 @@
+package org.apache.airavata.helix;
+
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.helix.*;
+import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class WorkflowManager {
+
+    private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
+
+
+    public static void main(String args[]) {
+        Workflow workflow = createWorkflow().build();
+
+        org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
+                InstanceType.SPECTATOR, "localhost:2199");
+
+        try {
+            helixManager.connect();
+            TaskDriver taskDriver = new TaskDriver(helixManager);
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            helixManager.disconnect();
+                        }
+                    }
+            );
+
+            taskDriver.start(workflow);
+            logger.info("Started workflow");
+            TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+            System.out.println("Task state " + taskState.name());
+
+        } catch (Exception ex) {
+            logger.error("Error in connect() for Admin, reason: " + ex, ex);
+        }
+    }
+
+    private static Workflow.Builder createWorkflow() {
+        List<TaskConfig> downloadDataTasks = new ArrayList<>();
+        downloadDataTasks.add(new TaskConfig.Builder().setTaskId("Download_Task").setCommand(DataCollectingTask.NAME).build());
+
+        List<TaskConfig> commandExecuteTasks = new ArrayList<>();
+        commandExecuteTasks.add(new TaskConfig.Builder().setTaskId("Command_Task").setCommand(CommandTask.NAME).build());
+
+        List<TaskConfig> pushDataTasks = new ArrayList<>();
+        pushDataTasks.add(new TaskConfig.Builder().setTaskId("Push_Task").setCommand(DataPushingTask.NAME).build());
+
+        JobConfig.Builder downloadDataJob = new JobConfig.Builder()
+                .addTaskConfigs(downloadDataTasks)
+                .setMaxAttemptsPerTask(3).setInstanceGroupTag("p1");
+
+        JobConfig.Builder commandExecuteJob = new JobConfig.Builder()
+                .addTaskConfigs(commandExecuteTasks)
+                .setMaxAttemptsPerTask(3).setInstanceGroupTag("p2");
+
+        JobConfig.Builder dataPushJob = new JobConfig.Builder()
+                .addTaskConfigs(pushDataTasks)
+                .setMaxAttemptsPerTask(3).setInstanceGroupTag("p3");
+
+        Workflow.Builder workflow = new Workflow.Builder("Airavata_Workflow3").setExpiry(0);
+        workflow.addJob("downloadDataJob", downloadDataJob);
+        workflow.addJob("commandExecuteJob", commandExecuteJob);
+        workflow.addJob("dataPushJob", dataPushJob);
+
+        workflow.addParentChildDependency("downloadDataJob", "commandExecuteJob");
+        workflow.addParentChildDependency("downloadDataJob", "dataPushJob");
+
+        return workflow;
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
new file mode 100644
index 0000000..8e581ea
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
@@ -0,0 +1,139 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.airavata.k8s.compute.api.ComputeOperations;
+import org.apache.airavata.k8s.compute.impl.MockComputeOperation;
+import org.apache.airavata.k8s.compute.impl.SSHComputeOperations;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+    public static final String NEXT_JOB = "next-job";
+    public static final String WORKFLOW_STARTED = "workflow-started";
+    public static final String TASK_ID = "task_id";
+    public static final String PROCESS_ID = "process_id";
+
+    //Configurable values
+    private String apiServerUrl = "localhost:8080";
+    private String kafkaBootstrapUrl = "localhost:9092";
+    private String eventTopic = "airavata-task-event";
+
+    private TaskCallbackContext callbackContext;
+    private RestTemplate restTemplate;
+    private Producer<String, String> eventProducer;
+    private long processId;
+    private long taskId;
+
+    public AbstractTask(TaskCallbackContext callbackContext) {
+        this.callbackContext = callbackContext;
+        this.taskId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(TASK_ID));
+        this.processId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(PROCESS_ID));
+        this.restTemplate = new RestTemplate();
+        initializeKafkaEventProducer();
+        init();
+    }
+
+    public TaskCallbackContext getCallbackContext() {
+        return callbackContext;
+    }
+
+    @Override
+    public final TaskResult run() {
+        boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
+                this.callbackContext.getJobConfig().getJobId()
+                        .equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
+        if (isThisNextJob) {
+            return onRun();
+        } else {
+            return new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
+        }
+    }
+
+    @Override
+    public final void cancel() {
+        onCancel();
+    }
+
+    public void init() {
+
+    }
+
+    public abstract TaskResult onRun();
+
+    public abstract void onCancel();
+
+    public void sendToOutPort(String port) {
+        putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
+        String outJob = getCallbackContext().getTaskConfig().getConfigMap().get("OUT_" + port);
+        if (outJob != null) {
+            putUserContent(NEXT_JOB, outJob, Scope.WORKFLOW);
+        }
+    }
+
+    public RestTemplate getRestTemplate() {
+        return restTemplate;
+    }
+
+    public String getApiServerUrl() {
+        return apiServerUrl;
+    }
+
+    public ComputeOperations fetchComputeResourceOperation(ComputeResource computeResource) throws Exception {
+        ComputeOperations operations;
+        if ("SSH".equals(computeResource.getCommunicationType())) {
+            operations = new SSHComputeOperations(computeResource.getHost(), computeResource.getUserName(), computeResource.getPassword());
+        } else if ("Mock".equals(computeResource.getCommunicationType())) {
+            operations = new MockComputeOperation(computeResource.getHost());
+        } else {
+            throw new Exception("No compatible communication method {" + computeResource.getCommunicationType() + "} not found for compute resource " + computeResource.getName());
+        }
+        return operations;
+    }
+
+    public void initializeKafkaEventProducer() {
+        Properties props = new Properties();
+
+        props.put("bootstrap.servers", this.kafkaBootstrapUrl);
+
+        props.put("acks", "all");
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 1);
+        props.put("buffer.memory", 33554432);
+        props.put("key.serializer",
+                "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer",
+                "org.apache.kafka.common.serialization.StringSerializer");
+
+        eventProducer = new KafkaProducer<String, String>(props);
+    }
+
+    public void publishTaskStatus(long status, String reason) {
+        eventProducer.send(new ProducerRecord<String, String>(
+                this.eventTopic, String.join(",", this.processId + "", this.taskId + "", status + "", reason)));
+    }
+
+    public long getProcessId() {
+        return processId;
+    }
+
+    public long getTaskId() {
+        return taskId;
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
new file mode 100644
index 0000000..594ce35
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
@@ -0,0 +1,32 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataCollectingTask extends UserContentStore implements Task {
+
+    public static final String NAME = "DATA_COLLECTING";
+
+    public DataCollectingTask(TaskCallbackContext callbackContext) {
+    }
+
+    public TaskResult run() {
+        System.out.println("Executing data collecting");
+        putUserContent("Key", "Hooo", Scope.WORKFLOW);
+
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+
+    }
+
+    public void cancel() {
+
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
new file mode 100644
index 0000000..fc7ee8b
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
@@ -0,0 +1,39 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataPushingTask extends UserContentStore implements Task {
+
+    public static final String NAME = "DATA_PUSHING";
+
+    public DataPushingTask(TaskCallbackContext callbackContext) {
+    }
+
+    public TaskResult run() {
+        System.out.println("Executing data pushing");
+        try {
+            Thread.currentThread().sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        System.out.println("Continuing");
+        String key2 = getUserContent("Key2", Scope.WORKFLOW);
+
+        System.out.println(key2);
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+
+    }
+
+    public void cancel() {
+
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
new file mode 100644
index 0000000..5aae5b7
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
@@ -0,0 +1,137 @@
+package org.apache.airavata.helix.tasks.command;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.airavata.k8s.compute.api.ExecutionResult;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+import java.util.Arrays;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class CommandTask extends AbstractTask {
+
+    public static final String NAME = "COMMAND";
+
+    private String command;
+    private String arguments;
+    private String stdOutPath;
+    private String stdErrPath;
+    private String computeResourceId;
+    private ComputeResource computeResource;
+
+    public CommandTask(TaskCallbackContext callbackContext) {
+        super(callbackContext);
+    }
+
+    @Override
+    public void init() {
+        this.command = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMMAND);
+        this.arguments = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.ARGUMENTS);
+        this.stdOutPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_OUT_PATH);
+        this.stdErrPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_ERR_PATH);
+        this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+        this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+                + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+    }
+
+    public TaskResult onRun() {
+        System.out.println("Executing command " + command);
+        try {
+
+            String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
+
+            publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+            String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
+
+            System.out.println("Executing command " + finalCommand);
+
+            ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
+
+            if (executionResult.getExitStatus() == 0) {
+                publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+                sendToOutPort("Out");
+                return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+            } else if (executionResult.getExitStatus() == -1) {
+                publishTaskStatus(TaskStatusResource.State.FAILED, "Process didn't exit successfully");
+                sendToOutPort("Error");
+                return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+
+            } else {
+                publishTaskStatus(TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
+                sendToOutPort("Error");
+                return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+            }
+
+        } catch (Exception e) {
+
+            e.printStackTrace();
+            publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+            return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+        }
+    }
+
+    public void onCancel() {
+
+    }
+
+    public static TaskTypeResource getTaskType() {
+        TaskTypeResource taskTypeResource = new TaskTypeResource();
+        taskTypeResource.setName(NAME);
+        taskTypeResource.setTopicName("airavata-command");
+        taskTypeResource.setIcon("assets/icons/ssh.png");
+        taskTypeResource.getInputTypes().addAll(
+                Arrays.asList(
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.COMMAND)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.ARGUMENTS)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.COMPUTE_RESOURCE)
+                                .setType("Long")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.STD_OUT_PATH)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.STD_ERR_PATH)
+                                .setType("String")
+                                .setDefaultValue("")));
+
+        taskTypeResource.getOutPorts().addAll(
+                Arrays.asList(
+                        new TaskOutPortTypeResource()
+                                .setName("Out")
+                                .setOrder(0),
+                        new TaskOutPortTypeResource()
+                                .setName("Error")
+                                .setOrder(1))
+        );
+
+        return taskTypeResource;
+    }
+
+    public static final class PARAMS {
+        public static final String COMMAND = "command";
+        public static final String ARGUMENTS = "arguments";
+        public static final String STD_OUT_PATH = "std_out_path";
+        public static final String STD_ERR_PATH = "std_err_path";
+        public static final String COMPUTE_RESOURCE = "compute_resource";
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
new file mode 100644
index 0000000..f852d8f
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
@@ -0,0 +1,55 @@
+package org.apache.airavata.helix.tasks.command;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+    public Participant(String zkAddress, String clusterName, String participantName, String taskTypeName, String apiServerUrl) {
+        super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+    }
+
+    @Override
+    public Map<String, TaskFactory> getTaskFactory() {
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+        TaskFactory commandTaskFac = new TaskFactory() {
+            @Override
+            public Task createNewTask(TaskCallbackContext context) {
+                return new CommandTask(context);
+            }
+        };
+
+        taskRegistry.put(CommandTask.NAME, commandTaskFac);
+
+        return taskRegistry;
+    }
+
+    @Override
+    public TaskTypeResource getTaskType() {
+        return CommandTask.getTaskType();
+    }
+
+    public static void main(String args[]) {
+        HelixParticipant participant = new Participant(
+                "localhost:2199",
+                "AiravataDemoCluster",
+                "command-p1", CommandTask.NAME,
+                "localhost:8080");
+        new Thread(participant).start();
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
new file mode 100644
index 0000000..41d6aa4
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
@@ -0,0 +1,123 @@
+package org.apache.airavata.helix.tasks.datain;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataInputTask extends AbstractTask {
+
+    public static final String NAME = "DATA_INPUT";
+
+    private String remoteSourcePath;
+    private String targetPath;
+    private String computeResourceId;
+    private ComputeResource computeResource;
+
+    public DataInputTask(TaskCallbackContext callbackContext) {
+        super(callbackContext);
+    }
+
+    @Override
+    public void init() {
+        this.remoteSourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.REMOTE_SOURCE_PATH);
+        this.targetPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.TARGET_PATH);
+        this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+        this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+                + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+    }
+
+    @Override
+    public TaskResult onRun() {
+        try {
+            String tempFilePath = "/tmp/" + UUID.randomUUID().toString();
+            System.out.println("Creating tmp file " + tempFilePath);
+            publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+            if (remoteSourcePath.startsWith("http")) {
+                System.out.println("Downloading text file " + remoteSourcePath);
+                FileUtils.copyURLToFile(new URL(remoteSourcePath), new File(tempFilePath));
+
+            } else {
+                publishTaskStatus(TaskStatusResource.State.FAILED, "Unsupported source type");
+                sendToOutPort("Error");
+                return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+
+            }
+
+            System.out.println("Transferring file to remote resource");
+            fetchComputeResourceOperation(computeResource).transferDataIn(tempFilePath, this.targetPath, "SCP");
+
+            publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+            sendToOutPort("Out");
+            return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+        } catch (Exception e) {
+
+            e.printStackTrace();
+            publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+            sendToOutPort("Error");
+            return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+
+    public static TaskTypeResource getTaskType() {
+        TaskTypeResource taskTypeResource = new TaskTypeResource();
+        taskTypeResource.setName(NAME);
+        taskTypeResource.setTopicName("airavata-data-collect");
+        taskTypeResource.setIcon("assets/icons/copy.png");
+        taskTypeResource.getInputTypes().addAll(
+                Arrays.asList(
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.REMOTE_SOURCE_PATH)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.TARGET_PATH)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.COMPUTE_RESOURCE)
+                                .setType("Long")
+                                .setDefaultValue("")));
+
+        taskTypeResource.getOutPorts().addAll(
+                Arrays.asList(
+                        new TaskOutPortTypeResource()
+                                .setName("Out")
+                                .setOrder(0),
+                        new TaskOutPortTypeResource()
+                                .setName("Error")
+                                .setOrder(1))
+        );
+
+        return taskTypeResource;
+    }
+
+    public static final class PARAMS {
+        public static final String REMOTE_SOURCE_PATH = "remote_source_path";
+        public static final String TARGET_PATH = "target_path";
+        public static final String COMPUTE_RESOURCE = "compute_resource";
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
new file mode 100644
index 0000000..f06f56b
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
@@ -0,0 +1,56 @@
+package org.apache.airavata.helix.tasks.datain;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+    public Participant(String zkAddress, String clusterName, String participantName,
+                       String taskTypeName,
+                       String apiServerUrl) {
+        super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+    }
+
+    @Override
+    public Map<String, TaskFactory> getTaskFactory() {
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+        TaskFactory dataInTask = new TaskFactory() {
+            @Override
+            public Task createNewTask(TaskCallbackContext context) {
+                return new DataInputTask(context);
+            }
+        };
+
+        taskRegistry.put(DataInputTask.NAME, dataInTask);
+
+        return taskRegistry;
+    }
+
+    @Override
+    public TaskTypeResource getTaskType() {
+        return DataInputTask.getTaskType();
+    }
+
+    public static void main(String args[]) {
+        HelixParticipant participant = new Participant(
+                "localhost:2199",
+                "AiravataDemoCluster",
+                "data-in-p1", DataInputTask.NAME,
+                "localhost:8080");
+        new Thread(participant).start();
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
new file mode 100644
index 0000000..314e912
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
@@ -0,0 +1,128 @@
+package org.apache.airavata.helix.tasks.dataout;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.LinkedMultiValueMap;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataOutputTask extends AbstractTask {
+
+    public static final String NAME = "DATA_OUTPUT";
+
+    private String sourcePath;
+    private String identifier;
+    private String computeResourceId;
+    private ComputeResource computeResource;
+
+    public DataOutputTask(TaskCallbackContext callbackContext) {
+        super(callbackContext);
+    }
+
+    @Override
+    public void init() {
+        this.sourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.SOURCE_PATH);
+        this.identifier = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.IDENTIFIER);
+        this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+        this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+                + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+    }
+
+    @Override
+    public TaskResult onRun() {
+        try {
+            publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+            String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
+            System.out.println("Downloading " + sourcePath + " to " + temporaryFile + " from compute resource "
+                    + computeResource.getName());
+
+            fetchComputeResourceOperation(computeResource).transferDataOut(sourcePath, temporaryFile, "SCP");
+
+            LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
+            map.add("file", new FileSystemResource(temporaryFile));
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.MULTIPART_FORM_DATA);
+
+            HttpEntity<LinkedMultiValueMap<String, Object>> requestEntity = new HttpEntity<>(map, headers);
+
+            System.out.println("Uploading data file with task id " + getTaskId() + " and identifier "
+                    + identifier + " to data store");
+
+            getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + getTaskId() + "/"
+                    + identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
+
+            publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+            sendToOutPort("Out");
+            return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+            sendToOutPort("Error");
+            return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task completed");
+
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+
+    public static final class PARAMS {
+        public static final String SOURCE_PATH = "source_path";
+        public static final String COMPUTE_RESOURCE = "compute_resource";
+        public static final String IDENTIFIER = "identifier";
+    }
+
+    public static TaskTypeResource getTaskType() {
+        TaskTypeResource taskTypeResource = new TaskTypeResource();
+        taskTypeResource.setName(NAME);
+        taskTypeResource.setTopicName("airavata-data-collect");
+        taskTypeResource.setIcon("assets/icons/copy.png");
+        taskTypeResource.getInputTypes().addAll(
+                Arrays.asList(
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.SOURCE_PATH)
+                                .setType("String")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.IDENTIFIER)
+                                .setType("String"),
+                        new TaskInputTypeResource()
+                                .setName(PARAMS.COMPUTE_RESOURCE)
+                                .setType("Long")));
+
+        taskTypeResource.getOutPorts().addAll(
+                Arrays.asList(
+                        new TaskOutPortTypeResource()
+                                .setName("Out")
+                                .setOrder(0),
+                        new TaskOutPortTypeResource()
+                                .setName("Error")
+                                .setOrder(1))
+        );
+
+        return taskTypeResource;
+
+    }
+}
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
new file mode 100644
index 0000000..b226511
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
@@ -0,0 +1,54 @@
+package org.apache.airavata.helix.tasks.dataout;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+    public Participant(String zkAddress, String clusterName, String participantName,
+                       String taskTypeName, String apiServerUrl) {
+        super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+    }
+
+    @Override
+    public Map<String, TaskFactory> getTaskFactory() {
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+        TaskFactory dataInTask = new TaskFactory() {
+            @Override
+            public Task createNewTask(TaskCallbackContext context) {
+                return new DataOutputTask(context);
+            }
+        };
+
+        taskRegistry.put(DataOutputTask.NAME, dataInTask);
+
+        return taskRegistry;
+    }
+
+    @Override
+    public TaskTypeResource getTaskType() {
+        return DataOutputTask.getTaskType();
+    }
+
+    public static void main(String args[]) {
+        HelixParticipant participant = new Participant(
+                "localhost:2199",
+                "AiravataDemoCluster",
+                "data-out-p1", DataOutputTask.NAME,
+                "localhost:8080");
+        new Thread(participant).start();
+    }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties b/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
index 016011d..2a7f073 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
@@ -46,18 +46,19 @@ public class DataStoreController {
     @Resource
     private DataStoreService dataStoreService;
 
-    @PostMapping("{taskId}/{expOutputId}/upload")
+    @PostMapping("{taskId}/{identifier}/upload")
     public long uploadData(@RequestParam("file") MultipartFile file, @PathVariable("taskId") long taskId,
-                           @PathVariable("expOutputId") long expOutputId, RedirectAttributes redirectAttributes) {
+                           @PathVariable("identifier") String identifier, RedirectAttributes redirectAttributes) {
 
-        System.out.println("Received data for task id " + taskId + " and experiment output id " + expOutputId);
+        System.out.println("Received data for task id " + taskId + " and identifier " + identifier);
         if (file.isEmpty()) {
             throw new ServerRuntimeException("Data file is empty");
         }
         try {
             // Get the file and save it somewhere
             byte[] bytes = file.getBytes();
-            return this.dataStoreService.createEntry(taskId, expOutputId, bytes);
+            return this.dataStoreService.createEntry(taskId, identifier, bytes);
+
         } catch (IOException e) {
             e.printStackTrace();
             throw new ServerRuntimeException("Failed to store file", e);
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
index 71df7c4..6b42313 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
@@ -44,8 +44,7 @@ public class DataStoreModel {
     @Basic(fetch = FetchType.LAZY)
     private byte[] content;
 
-    @ManyToOne
-    private ExperimentOutputData experimentOutputData;
+    private String identifier;
 
     @ManyToOne
     private TaskModel taskModel;
@@ -68,12 +67,12 @@ public class DataStoreModel {
         return this;
     }
 
-    public ExperimentOutputData getExperimentOutputData() {
-        return experimentOutputData;
+    public String getIdentifier() {
+        return identifier;
     }
 
-    public DataStoreModel setExperimentOutputData(ExperimentOutputData experimentOutputData) {
-        this.experimentOutputData = experimentOutputData;
+    public DataStoreModel setIdentifier(String identifier) {
+        this.identifier = identifier;
         return this;
     }
 
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 68abab4..f4492fe 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -105,10 +105,10 @@ public class WorkflowService {
             });
 
             parseResult.getEdgeCache().forEach(((outPort, inPort) -> {
-                if (outPort.getParentOperation() != null && outPort.getNextPort().getParentOperation() != null) {
+                if (outPort.getParentTask() != null && outPort.getNextPort().getParentTask() != null) {
                     Optional<TaskOutPort> sourceOutPort = taskOutPortRepository
-                            .findByReferenceIdAndTaskModel_Id(outPort.getId(), outPort.getParentTask().getId());
-                    Optional<TaskModel> targetTask = taskRepository.findById(inPort.getParentTask().getId());
+                            .findByReferenceIdAndTaskModel_Id(outPort.getId(), outPort.getParentTask().getTaskResource().getId());
+                    Optional<TaskModel> targetTask = taskRepository.findById(inPort.getParentTask().getTaskResource().getId());
 
                     taskDAGRepository.save(new TaskDAG()
                             .setSourceOutPort(sourceOutPort.get())
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
index 272ab33..e775acc 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
@@ -51,10 +51,10 @@ public class DataStoreService {
         this.experimentOutputDataRepository = experimentOutputDataRepository;
     }
 
-    public long createEntry(long taskId, long expOutId, byte[] content) {
+    public long createEntry(long taskId, String identifier, byte[] content) {
         DataStoreModel model = new DataStoreModel();
         model.setTaskModel(taskRepository.findById(taskId).get())
-                .setExperimentOutputData(experimentOutputDataRepository.findById(expOutId).get())
+                .setIdentifier(identifier)
                 .setContent(content);
         return dataStoreRepository.save(model).getId();
     }
@@ -64,8 +64,7 @@ public class DataStoreService {
         List<DataStoreModel> dataStoreModels = this.dataStoreRepository.findByTaskModel_ParentProcess_Id(processId);
         Optional.ofNullable(dataStoreModels).ifPresent(models -> models.forEach(model -> entries.add(new DataEntryResource()
                 .setId(model.getId())
-                .setName(model.getExperimentOutputData().getName())
-                .setDataType(model.getExperimentOutputData().getType().name()))));
+                .setName(model.getIdentifier()))));
         return entries;
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index acd94c9..22b970f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -249,6 +249,10 @@ public class ToResourceUtil {
                             taskStatuses.forEach(taskStatus -> resource.getTaskStatus()
                                     .add(toResource(taskStatus).get())));
 
+            Optional.ofNullable(taskModel.getTaskOutPorts())
+                    .ifPresent(outPorts -> outPorts.forEach(outPort -> resource.getOutPorts()
+                            .add(toResource(outPort).get())));
+
             resource.setOrder(taskModel.getOrderIndex());
             return Optional.of(resource);
         } else {
@@ -442,7 +446,7 @@ public class ToResourceUtil {
             resource.setId(outPort.getId());
             resource.setReferenceId(outPort.getReferenceId());
             resource.setName(outPort.getName());
-            resource.setTaskResource(toResource(outPort.getTaskModel()).get());
+            //resource.setTaskResource(toResource(outPort.getTaskModel()).get());
             return Optional.of(resource);
         } else {
             return Optional.empty();
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
index 6372cb1..8f502c8 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
@@ -57,6 +57,24 @@
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>0.6.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
new file mode 100644
index 0000000..e9dead8
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
@@ -0,0 +1,162 @@
+package org.apache.airavata.k8s.gfac.core;
+
+import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Op;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixWorkflowManager {
+
+    private static final Logger logger = LogManager.getLogger(HelixWorkflowManager.class);
+
+    private long processId;
+    private List<TaskResource> tasks;
+
+    // out port id, next task id
+    private Map<Long, Long> edgeMap;
+
+    private KafkaSender kafkaSender;
+
+    // Todo abstract out these parameters to reusable class
+    private final RestTemplate restTemplate;
+    private String apiServerUrl;
+
+    public HelixWorkflowManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
+                                KafkaSender kafkaSender,
+                                RestTemplate restTemplate, String apiServerUrl) {
+        this.processId = processId;
+        this.tasks = tasks;
+        this.edgeMap = edgeMap;
+        this.kafkaSender = kafkaSender;
+        this.restTemplate = restTemplate;
+        this.apiServerUrl = apiServerUrl;
+    }
+
+    public void launchWorkflow() {
+        org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
+                InstanceType.SPECTATOR, "localhost:2199");
+
+        try {
+
+            Workflow.Builder workflowBuilder = createWorkflow();
+            WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+            workflowBuilder.setWorkflowConfig(config.build());
+            if (workflowBuilder == null) {
+                throw new Exception("Failed to create a workflow for process id " + processId);
+            }
+
+            Workflow workflow = workflowBuilder.build();
+
+            helixManager.connect();
+            TaskDriver taskDriver = new TaskDriver(helixManager);
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            helixManager.disconnect();
+                        }
+                    }
+            );
+
+            taskDriver.start(workflow);
+            logger.info("Started workflow");
+            TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+            System.out.println("Workflow state " + taskState.name());
+
+        } catch (Exception ex) {
+            logger.error("Error in connect() for Admin, reason: " + ex, ex);
+        }
+    }
+
+    private Workflow.Builder createWorkflow() {
+        Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
+        if (startingTask.isPresent()) {
+            Workflow.Builder workflow = new Workflow.Builder("Airavata_Process_" + processId).setExpiry(0);
+            createWorkflowRecursively(startingTask.get(), workflow, null);
+            return workflow;
+        } else {
+            System.out.println("No starting task for this process " + processId);
+            updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for this process");
+            return null;
+        }
+    }
+
+    private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId) {
+
+        TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + taskResource.getId())
+                .setCommand(taskResource.getTaskType().getName());
+
+        Optional.ofNullable(taskResource.getInputs()).ifPresent(inputs -> inputs.forEach(input -> {
+            taskBuilder.addConfig(input.getName(), input.getValue());
+        }));
+
+        taskBuilder.addConfig("task_id", taskResource.getId() + "");
+        taskBuilder.addConfig("process_id", taskResource.getParentProcessId() + "");
+
+        Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
+            Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
+                Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
+                nextTaskResource.ifPresent(t -> {
+                    taskBuilder.addConfig("OUT_"+ outPort.getName(), "JOB_" + t.getId());
+                });
+            });
+        }));
+
+
+        List<TaskConfig> taskBuilds = new ArrayList<>();
+        taskBuilds.add(taskBuilder.build());
+
+        JobConfig.Builder job = new JobConfig.Builder()
+                .addTaskConfigs(taskBuilds)
+                .setFailureThreshold(0)
+                .setMaxAttemptsPerTask(3)
+                .setInstanceGroupTag(taskResource.getTaskType().getName());
+
+        workflow.addJob(("JOB_" + taskResource.getId()), job);
+        if (parentTaskId != null) {
+            workflow.addParentChildDependency("JOB_" + parentTaskId, "JOB_" + taskResource.getId());
+        }
+
+        Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
+            Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
+                Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
+                nextTaskResource.ifPresent(t -> {
+
+                    createWorkflowRecursively(t, workflow, taskResource.getId());
+                });
+            });
+        }));
+    }
+
+    private void updateProcessStatus(ProcessStatusResource.State state) {
+        updateProcessStatus(state, "");
+    }
+
+    private void updateProcessStatus(ProcessStatusResource.State state, String reason) {
+        this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId + "/status",
+                new ProcessStatusResource()
+                        .setState(state.getValue())
+                        .setReason(reason)
+                        .setTimeOfStateChange(System.currentTimeMillis()),
+                Long.class);
+    }
+
+}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
index 42aa43d..6a02975 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
@@ -41,10 +41,10 @@ public class KafkaReceiver {
         System.out.println("received process=" + payload);
         workerService.launchProcess(Long.parseLong(payload));
     }
-
-    @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
-    public void receiveTaskEvent(TaskContext taskContext) {
-        System.out.println("received event for task id =" + taskContext.getTaskId());
-        workerService.onTaskStateEvent(taskContext);
-    }
+//
+//    @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
+//    public void receiveTaskEvent(TaskContext taskContext) {
+//        System.out.println("received event for task id =" + taskContext.getTaskId());
+//        workerService.onTaskStateEvent(taskContext);
+//    }
 }
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
new file mode 100644
index 0000000..05a1c55
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
@@ -0,0 +1,17 @@
+package org.apache.airavata.k8s.gfac.service;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Service
+public class HelixWorkflowService {
+
+    public void launchProcess(long processId) {
+
+    }
+}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 5449951..1ccf1d1 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -23,14 +23,19 @@ import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.gfac.core.HelixWorkflowManager;
 import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager;
 import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
 import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * TODO: Class level comments please
@@ -44,6 +49,7 @@ public class WorkerService {
     private final RestTemplate restTemplate;
     private final KafkaSender kafkaSender;
     private Map<Long, ProcessLifeCycleManager> processLifecycleStore = new HashMap<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
 
     @Value("${api.server.url}")
     private String apiServerUrl;
@@ -59,8 +65,9 @@ public class WorkerService {
                 ProcessResource.class);
         List<TaskResource> taskResources = processResource.getTasks();
 
-        Set<TaskDagResource> takDagSet = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/dag/"
-                + processId, Set.class);
+        Set<TaskDagResource> takDagSet = this.restTemplate.exchange("http://" + apiServerUrl + "/task/dag/"
+                + processId, HttpMethod.GET, null, new ParameterizedTypeReference<Set<TaskDagResource>>() {})
+                .getBody();
 
         final Map<Long, Long> edgeMap = new HashMap<>();
         Optional.ofNullable(takDagSet)
@@ -68,12 +75,22 @@ public class WorkerService {
                         edgeMap.put(dag.getSourceOutPort().getId(), dag.getTargetTask().getId())));
 
         System.out.println("Starting to execute process " + processId);
-        ProcessLifeCycleManager manager =
-                new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+        //ProcessLifeCycleManager manager =
+        //        new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
 
-        manager.init();
-        manager.start();
-        processLifecycleStore.put(processId, manager);
+        //manager.init();
+        //manager.start();
+
+        //processLifecycleStore.put(processId, manager);
+
+        final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                helixWorkflowManager.launchWorkflow();
+            }
+        });
     }
 
     public void onTaskStateEvent(TaskContext taskContext) {
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties
new file mode 100644
index 0000000..0a185ad
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/pom.xml b/airavata-kubernetes/pom.xml
index c758f70..222fffe 100644
--- a/airavata-kubernetes/pom.xml
+++ b/airavata-kubernetes/pom.xml
@@ -40,6 +40,7 @@
         <module>modules/microservices/tasks/data-pushing-task</module>
         <module>modules/microservices/tasks/data-collecting-task</module>
         <module>modules/task-api</module>
+        <module>modules/helix-tasks</module>
     </modules>
 
     <dependencyManagement>

-- 
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.