You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:13:47 UTC

[airavata-sandbox] 03/19: Updated tasks to accept TaskContexts as inputs

This is an automated email from the ASF dual-hosted git repository.

smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git

commit 7ee67aa756c3961bafc9573cd2c6a6b2f60e9bae
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Wed Nov 8 17:06:59 2017 +0530

    Updated tasks to accept TaskContexts as inputs
---
 .../resources/process/ProcessStatusResource.java   |  59 ++++++++---
 .../k8s/api/resources/task/TaskDagResource.java    |  40 ++++++++
 .../api/resources/task/TaskOutPortResource.java    |  10 ++
 .../k8s/api/resources/task/TaskResource.java       |  22 +++-
 .../api/server/controller/ProcessController.java   |   2 +
 .../k8s/api/server/controller/TaskController.java  |   7 ++
 .../k8s/api/server/model/task/TaskModel.java       |  11 ++
 .../server/repository/task/TaskDAGRepository.java  |   4 +
 .../k8s/api/server/service/WorkflowService.java    |  12 ++-
 .../k8s/api/server/service/task/TaskService.java   |  17 +++-
 .../k8s/api/server/service/util/GraphParser.java   |   4 +-
 .../api/server/service/util/ToResourceUtil.java    |  42 ++++++--
 .../src/main/resources/application.properties      |   3 +-
 .../modules/microservices/task-scheduler/pom.xml   |   5 +
 .../k8s/gfac/core/ProcessLifeCycleManager.java     | 113 +++++++++++++++++----
 .../airavata/k8s/gfac/messaging/KafkaReceiver.java |  11 +-
 .../airavata/k8s/gfac/messaging/KafkaSender.java   |   8 +-
 .../k8s/gfac/messaging/ReceiverConfig.java         |  12 ++-
 .../airavata/k8s/gfac/messaging/SenderConfig.java  |  10 +-
 .../airavata/k8s/gfac/service/WorkerService.java   |  45 ++++----
 .../src/main/resources/application.properties      |   2 +-
 .../k8s/task/job/service/TaskExecutionService.java |  20 ++--
 .../src/main/resources/application.properties      |   4 +-
 .../task/egress/service/TaskExecutionService.java  |  13 ++-
 .../task/ingress/service/TaskExecutionService.java |  13 +--
 .../k8s/task/api/AbstractTaskExecutionService.java |  30 ++++--
 .../apache/airavata/k8s/task/api/TaskContext.java  |  74 +++++++++++++-
 .../k8s/task/api/TaskContextDeserializer.java      |  19 ++++
 .../k8s/task/api/TaskContextSerializer.java        |  22 +++-
 .../k8s/task/api/messaging/KafkaSender.java        |  11 +-
 .../k8s/task/api/messaging/ReceiverConfig.java     |  12 ++-
 .../k8s/task/api/messaging/SenderConfig.java       |  10 +-
 32 files changed, 518 insertions(+), 149 deletions(-)

diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
index cfaa212..40fc9b1 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
@@ -19,6 +19,9 @@
  */
 package org.apache.airavata.k8s.api.resources.process;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * TODO: Class level comments please
  *
@@ -88,20 +91,46 @@ public class ProcessStatusResource {
         return this;
     }
 
-    public static final class State {
-        public static final int CREATED = 0;
-        public static final int VALIDATED = 1;
-        public static final int STARTED = 2;
-        public static final int PRE_PROCESSING = 3;
-        public static final int CONFIGURING_WORKSPACE = 4;
-        public static final int INPUT_DATA_STAGING = 5;
-        public static final int EXECUTING = 6;
-        public static final int MONITORING = 7;
-        public static final int OUTPUT_DATA_STAGING = 8;
-        public static final int POST_PROCESSING = 9;
-        public static final int COMPLETED = 10;
-        public static final int FAILED = 11;
-        public static final int CANCELLING = 12;
-        public static final int CANCELED = 13;
+    public static enum State {
+
+        CREATED(0),
+        VALIDATED(1),
+        STARTED(2),
+        PRE_PROCESSING(3),
+        CONFIGURING_WORKSPACE(4),
+        INPUT_DATA_STAGING(5),
+        EXECUTING(6),
+        MONITORING(7),
+        OUTPUT_DATA_STAGING(8),
+        POST_PROCESSING(9),
+        COMPLETED(10),
+        FAILED(11),
+        CANCELLING(12),
+        CANCELED(13);
+
+        private final int value;
+
+        private State(int value) {
+            this.value = value;
+        }
+
+        private static Map<Integer, State> map = new HashMap<>();
+
+        static {
+            for (State state : State.values()) {
+                map.put(state.value, state);
+            }
+        }
+
+        public static State valueOf(int taskState) {
+            return map.get(taskState);
+        }
+
+        /**
+         * Get the integer value of this enum value, as defined in the Thrift IDL.
+         */
+        public int getValue() {
+            return value;
+        }
     }
 }
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java
new file mode 100644
index 0000000..bfef611
--- /dev/null
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.k8s.api.resources.task;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskDagResource {
+    private long id;
+    private TaskOutPortResource sourceOutPort;
+    private TaskResource targetTask;
+
+    public long getId() {
+        return id;
+    }
+
+    public TaskDagResource setId(long id) {
+        this.id = id;
+        return this;
+    }
+
+    public TaskOutPortResource getSourceOutPort() {
+        return sourceOutPort;
+    }
+
+    public TaskDagResource setSourceOutPort(TaskOutPortResource sourceOutPort) {
+        this.sourceOutPort = sourceOutPort;
+        return this;
+    }
+
+    public TaskResource getTargetTask() {
+        return targetTask;
+    }
+
+    public TaskDagResource setTargetTask(TaskResource targetTask) {
+        this.targetTask = targetTask;
+        return this;
+    }
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
index de4ef1b..2ea8b3f 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
@@ -10,6 +10,7 @@ public class TaskOutPortResource {
     private long id;
     private String name;
     private int referenceId = 0;
+    private TaskResource taskResource;
 
     public long getId() {
         return id;
@@ -37,4 +38,13 @@ public class TaskOutPortResource {
         this.referenceId = referenceId;
         return this;
     }
+
+    public TaskResource getTaskResource() {
+        return taskResource;
+    }
+
+    public TaskOutPortResource setTaskResource(TaskResource taskResource) {
+        this.taskResource = taskResource;
+        return this;
+    }
 }
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
index bdc34d7..bb54ceb 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.api.resources.task;
 
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -31,8 +33,9 @@ import java.util.List;
 public class TaskResource {
 
     private long id;
+    private int referenceId; // for workflows
     private String name;
-    private long taskTypeId;
+    private TaskTypeResource taskType;
     private String taskTypeStr;
     private long parentProcessId;
     private long creationTime;
@@ -57,12 +60,12 @@ public class TaskResource {
         return this;
     }
 
-    public long getTaskTypeId() {
-        return taskTypeId;
+    public TaskTypeResource getTaskType() {
+        return taskType;
     }
 
-    public TaskResource setTaskTypeId(long taskTypeId) {
-        this.taskTypeId = taskTypeId;
+    public TaskResource setTaskType(TaskTypeResource taskType) {
+        this.taskType = taskType;
         return this;
     }
 
@@ -199,6 +202,15 @@ public class TaskResource {
         return this;
     }
 
+    public int getReferenceId() {
+        return referenceId;
+    }
+
+    public TaskResource setReferenceId(int referenceId) {
+        this.referenceId = referenceId;
+        return this;
+    }
+
     public static final class TaskTypes {
         public static final int ENV_SETUP = 0;
         public static final int INGRESS_DATA_STAGING = 1;
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
index 7dffe68..93a530c 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
@@ -20,6 +20,7 @@
 package org.apache.airavata.k8s.api.server.controller;
 
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.server.service.ProcessService;
@@ -27,6 +28,7 @@ import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
+import java.util.Set;
 
 /**
  * TODO: Class level comments please
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
index 2fb2334..c72ba47 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
@@ -19,6 +19,7 @@
  */
 package org.apache.airavata.k8s.api.server.controller;
 
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -27,6 +28,7 @@ import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
+import java.util.Set;
 
 /**
  * TODO: Class level comments please
@@ -62,4 +64,9 @@ public class TaskController {
         return this.taskService.findTaskStatusById(id)
                 .orElseThrow(() -> new ServerRuntimeException("Task status with id " + id + " not found"));
     }
+
+    @GetMapping(path = "dag/{process_id}", produces = MediaType.APPLICATION_JSON_VALUE)
+    public Set<TaskDagResource> getDagForProcess(@PathVariable("process_id") long processId) {
+        return this.taskService.getDagForProcess(processId);
+    }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
index 247bae1..4006e57 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
@@ -42,6 +42,8 @@ public class TaskModel {
     @GeneratedValue(strategy = GenerationType.AUTO)
     private long id;
 
+    private int referenceId; // to track workflows
+
     private String name;
 
     @ManyToOne
@@ -211,4 +213,13 @@ public class TaskModel {
     public void setTaskOutPorts(List<TaskOutPort> taskOutPorts) {
         this.taskOutPorts = taskOutPorts;
     }
+
+    public int getReferenceId() {
+        return referenceId;
+    }
+
+    public TaskModel setReferenceId(int referenceId) {
+        this.referenceId = referenceId;
+        return this;
+    }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
index b2cb7fa..dc64fc0 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
@@ -3,6 +3,9 @@ package org.apache.airavata.k8s.api.server.repository.task;
 import org.apache.airavata.k8s.api.server.model.task.TaskDAG;
 import org.springframework.data.repository.CrudRepository;
 
+import java.util.Iterator;
+import java.util.Optional;
+
 /**
  * TODO: Class level comments please
  *
@@ -10,4 +13,5 @@ import org.springframework.data.repository.CrudRepository;
  * @since 1.0.0-SNAPSHOT
  */
 public interface TaskDAGRepository extends CrudRepository<TaskDAG, Long> {
+    public Iterable<TaskDAG> findBysourceOutPort_taskModel_parentProcess_id(long processId);
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 6090c90..68abab4 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -3,7 +3,6 @@ package org.apache.airavata.k8s.api.server.service;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
-import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
 import org.apache.airavata.k8s.api.server.model.task.TaskDAG;
 import org.apache.airavata.k8s.api.server.model.task.TaskModel;
 import org.apache.airavata.k8s.api.server.model.task.TaskOutPort;
@@ -12,9 +11,11 @@ import org.apache.airavata.k8s.api.server.repository.task.TaskDAGRepository;
 import org.apache.airavata.k8s.api.server.repository.task.TaskOutPortRepository;
 import org.apache.airavata.k8s.api.server.repository.task.TaskRepository;
 import org.apache.airavata.k8s.api.server.repository.workflow.WorkflowRepository;
+import org.apache.airavata.k8s.api.server.service.messaging.MessagingService;
 import org.apache.airavata.k8s.api.server.service.task.TaskService;
 import org.apache.airavata.k8s.api.server.service.util.GraphParser;
 import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
@@ -33,14 +34,19 @@ public class WorkflowService {
 
     private ProcessService processService;
     private TaskService taskService;
+    private MessagingService messagingService;
 
     private WorkflowRepository workflowRepository;
     private TaskOutPortRepository taskOutPortRepository;
     private TaskRepository taskRepository;
     private TaskDAGRepository taskDAGRepository;
 
+    @Value("${scheduler.topic.name}")
+    private String schedulerTopic;
+
     public WorkflowService(ProcessService processService,
                            TaskService taskService,
+                           MessagingService messagingService,
                            WorkflowRepository workflowRepository,
                            TaskOutPortRepository taskOutPortRepository,
                            TaskRepository taskRepository,
@@ -48,6 +54,7 @@ public class WorkflowService {
 
         this.processService = processService;
         this.taskService = taskService;
+        this.messagingService = messagingService;
         this.workflowRepository = workflowRepository;
         this.taskOutPortRepository = taskOutPortRepository;
         this.taskRepository = taskRepository;
@@ -112,6 +119,8 @@ public class WorkflowService {
         } catch (Exception e) {
             throw new ServerRuntimeException("Failed to create workflow", e);
         }
+
+        this.messagingService.send(schedulerTopic, processId + "");
         return 0;
     }
 
@@ -123,6 +132,7 @@ public class WorkflowService {
         return workflowResources;
     }
 
+    @SuppressWarnings("WeakerAccess")
     public Optional<Workflow> findEntityById(long id) {
         return this.workflowRepository.findById(id);
     }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
index d1b3828..9df55ab 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
@@ -19,6 +19,7 @@
  */
 package org.apache.airavata.k8s.api.server.service.task;
 
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -29,8 +30,7 @@ import org.apache.airavata.k8s.api.server.repository.task.type.TaskTypeRepositor
 import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
 import org.springframework.stereotype.Service;
 
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * TODO: Class level comments please
@@ -79,12 +79,13 @@ public class TaskService {
         taskModel.setStartingTask(resource.isStartingTask());
         taskModel.setStoppingTask(resource.isStoppingTask());
         taskModel.setTaskDetail(resource.getTaskDetail());
+        taskModel.setReferenceId(resource.getReferenceId());
         taskModel.setParentProcess(processRepository.findById(resource.getParentProcessId())
                 .orElseThrow(() -> new ServerRuntimeException("Can not find process with id " +
                         resource.getParentProcessId())));
-        taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskTypeId())
+        taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskType().getId())
                 .orElseThrow(() -> new ServerRuntimeException("Can not find task type with id " +
-                resource.getTaskTypeId())));
+                resource.getTaskType().getId())));
 
         TaskModel savedTask = taskRepository.save(taskModel);
 
@@ -141,4 +142,12 @@ public class TaskService {
         return ToResourceUtil.toResource(taskRepository.findById(id).get());
     }
 
+    public Set<TaskDagResource> getDagForProcess(long processId) {
+        Set<TaskDagResource> taskDagResources = new HashSet<>();
+        Iterable<TaskDAG> taskDags = this.taskDAGRepository.findBysourceOutPort_taskModel_parentProcess_id(processId);
+        Optional.ofNullable(taskDags).ifPresent(dags -> dags.forEach(taskDAG -> {
+           taskDagResources.add(ToResourceUtil.toResource(taskDAG).get());
+        }));
+        return taskDagResources;
+    }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
index 9cf1630..9f919f7 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
@@ -4,6 +4,7 @@ import org.apache.airavata.k8s.api.resources.task.TaskInputResource;
 import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
 import org.apache.airavata.k8s.api.resources.task.TaskOutputResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
 import org.w3c.dom.*;
 import org.xml.sax.InputSource;
 
@@ -55,12 +56,13 @@ public class GraphParser {
                             taskResource.setName(attr.getNodeValue());
 
                         } else if ("Type".equals(attr.getNodeName())) {
-                            taskResource.setTaskTypeId(Long.parseLong(attr.getNodeValue()));
+                            taskResource.setTaskType(new TaskTypeResource().setId(Long.parseLong(attr.getNodeValue())));
 
                         } else if (attr.getNodeName().startsWith("in-") || attr.getNodeName().startsWith("out-")) {
 
                         } else if ("id".equals(attr.getNodeName())) {
                             id = Integer.parseInt(attr.getNodeValue());
+                            taskResource.setReferenceId(id);
 
                         } else if (attr.getNodeName().startsWith("output-")) {
                             TaskOutputResource outputResource = new TaskOutputResource();
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index 7631b01..acd94c9 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -37,16 +37,13 @@ import org.apache.airavata.k8s.api.server.model.experiment.ExperimentOutputData;
 import org.apache.airavata.k8s.api.server.model.experiment.ExperimentStatus;
 import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
 import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
-import org.apache.airavata.k8s.api.server.model.task.TaskInput;
-import org.apache.airavata.k8s.api.server.model.task.TaskModel;
+import org.apache.airavata.k8s.api.server.model.task.*;
 import org.apache.airavata.k8s.api.resources.application.*;
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.experiment.ExperimentInputResource;
 import org.apache.airavata.k8s.api.resources.experiment.ExperimentOutputResource;
 import org.apache.airavata.k8s.api.resources.experiment.ExperimentResource;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
-import org.apache.airavata.k8s.api.server.model.task.TaskOutput;
-import org.apache.airavata.k8s.api.server.model.task.TaskStatus;
 import org.apache.airavata.k8s.api.server.model.task.type.TaskInputType;
 import org.apache.airavata.k8s.api.server.model.task.type.TaskModelType;
 import org.apache.airavata.k8s.api.server.model.task.type.TaskOutPortType;
@@ -231,10 +228,11 @@ public class ToResourceUtil {
             resource.setLastUpdateTime(taskModel.getLastUpdateTime());
             resource.setCreationTime(taskModel.getCreationTime());
             resource.setParentProcessId(taskModel.getParentProcess().getId());
-            resource.setTaskTypeId(taskModel.getTaskType().getId());
+            resource.setTaskType(toResource(taskModel.getTaskType()).get());
             resource.setTaskDetail(taskModel.getTaskDetail());
             resource.setStartingTask(taskModel.isStartingTask());
             resource.setStoppingTask(taskModel.isStoppingTask());
+            resource.setReferenceId(taskModel.getReferenceId());
             Optional.ofNullable(taskModel.getTaskInputs())
                     .ifPresent(inputs ->
                             inputs.forEach(input -> resource.getInputs()
@@ -306,7 +304,14 @@ public class ToResourceUtil {
             ProcessResource processResource = new ProcessResource();
             processResource.setId(processModel.getId());
             processResource.setLastUpdateTime(processModel.getLastUpdateTime());
-            processResource.setExperimentId(processModel.getExperiment().getId());
+
+            Optional.ofNullable(processModel.getExperiment()).ifPresent(experiment -> {
+                processResource.setExperimentId(experiment.getId());
+            });
+            Optional.ofNullable(processModel.getWorkflow()).ifPresent(workflow -> {
+                processResource.setWorkflowId(workflow.getId());
+            });
+
             processResource.setTaskDag(processModel.getTaskDag());
             processResource.setCreationTime(processModel.getCreationTime());
             Optional.ofNullable(processModel.getProcessStatuses())
@@ -418,4 +423,29 @@ public class ToResourceUtil {
             return Optional.empty();
         }
     }
+
+    public static Optional<TaskDagResource> toResource(TaskDAG taskDAG) {
+        if (taskDAG != null) {
+            TaskDagResource resource = new TaskDagResource();
+            resource.setId(taskDAG.getId());
+            resource.setSourceOutPort(toResource(taskDAG.getSourceOutPort()).get());
+            resource.setTargetTask(toResource(taskDAG.getTargetTask()).get());
+            return Optional.of(resource);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<TaskOutPortResource> toResource(TaskOutPort outPort) {
+        if (outPort != null) {
+            TaskOutPortResource resource = new TaskOutPortResource();
+            resource.setId(outPort.getId());
+            resource.setReferenceId(outPort.getReferenceId());
+            resource.setName(outPort.getName());
+            resource.setTaskResource(toResource(outPort.getTaskModel()).get());
+            return Optional.of(resource);
+        } else {
+            return Optional.empty();
+        }
+    }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
index 6f76911..bdcb9bc 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
@@ -2,4 +2,5 @@ spring.jpa.hibernate.ddl-auto=update
 spring.datasource.url=jdbc:mysql://db.default.svc.cluster.local:3306/airavata
 spring.datasource.username=root
 spring.datasource.password=fun123
-launch.topic.name = airavata-launch
\ No newline at end of file
+launch.topic.name = airavata-launch
+scheduler.topic.name = airavata-scheduler
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
index d2dca7c..6372cb1 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
@@ -39,6 +39,11 @@
             <artifactId>api-resource</artifactId>
             <version>1.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>task-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
index 751b2ec..8508f92 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,9 +20,11 @@
 package org.apache.airavata.k8s.gfac.core;
 
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.*;
@@ -36,45 +38,108 @@ import java.util.*;
 public class ProcessLifeCycleManager {
 
     private long processId;
-    private List<TaskResource> taskDag;
-    private Map<Long, Integer> taskPoint;
+    private List<TaskResource> tasks;
+    private TaskResource currentTask;
+    private Map<Long, Long> edgeMap;
+
     private KafkaSender kafkaSender;
 
     // Todo abstract out these parameters to reusable class
     private final RestTemplate restTemplate;
     private String apiServerUrl;
 
-    public ProcessLifeCycleManager(long processId, List<TaskResource> tasks,
+    public ProcessLifeCycleManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
                                    KafkaSender kafkaSender,
                                    RestTemplate restTemplate, String apiServerUrl) {
         this.processId = processId;
-        this.taskDag = tasks;
+        this.tasks = tasks;
+        this.edgeMap = edgeMap;
         this.kafkaSender = kafkaSender;
         this.restTemplate = restTemplate;
         this.apiServerUrl = apiServerUrl;
     }
 
     public void init() {
-        taskDag.sort(Comparator.comparing(TaskResource::getOrder));
-        taskPoint = new HashMap<>();
-        for (int i = 0; i < taskDag.size(); i++) {
-            taskPoint.put(taskDag.get(i).getId(), i);
+
+        Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
+        if (startingTask.isPresent()) {
+            this.currentTask = startingTask.get();
+        } else {
+            System.out.println("No starting task for this process " + processId);
+            updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for this process");
         }
+
+    }
+
+    public void start() {
         updateProcessStatus(ProcessStatusResource.State.EXECUTING);
+        System.out.println("Starting process " + processId + " with task " + currentTask.getName());
+
+        TaskContext startContext = new TaskContext();
+        startContext.assignTask(currentTask);
+
+        submitTaskToQueue(currentTask.getTaskType().getTopicName(), startContext);
     }
 
-    public synchronized void onTaskStateChanged(long taskId, int state) {
-        switch (state) {
+    public synchronized void onTaskStateChanged(TaskContext taskContext) {
+
+        updateProcessStatus(ProcessStatusResource.State.MONITORING, "Task moved to state "
+                + ProcessStatusResource.State.valueOf(taskContext.getStatus()).name());
+
+        if (taskContext.getTaskId() != currentTask.getId()) {
+            System.out.println("Incompatible task status received. " +
+                    "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId());
+            updateProcessStatus(ProcessStatusResource.State.FAILED, "Incompatible task status received. " +
+                    "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId());
+            return;
+        } else {
+            System.out.println("Compatible task status received");
+        }
+
+        switch (taskContext.getStatus()) {
             case TaskStatusResource.State.COMPLETED:
-                System.out.println("Task " + taskId + " was completed");
-                Optional.ofNullable(this.taskPoint.get(taskId)).ifPresent(point -> {
-                    if (point + 1 < taskDag.size()) {
-                        TaskResource resource = taskDag.get(point + 1);
-                        submitTaskToQueue(resource);
+
+                if (currentTask.isStoppingTask()) {
+                    System.out.println("Process completed with last task " + currentTask.getName());
+                    updateProcessStatus(ProcessStatusResource.State.COMPLETED, "Process completed with last task " + currentTask.getName());
+
+                } else {
+                    Optional<TaskOutPortResource> nextOutPort = currentTask.getOutPorts().stream()
+                            .filter(port -> port.getId() == taskContext.getOutPortId()).findFirst();
+                    if (nextOutPort.isPresent()) {
+
+                        if (edgeMap.containsKey(nextOutPort.get().getId())) {
+                            Long nextTaskId = edgeMap.get(nextOutPort.get().getId());
+                            Optional<TaskResource> nextTask = tasks.stream().filter(task -> task.getId() == nextTaskId).findFirst();
+
+                            if (nextTask.isPresent()) {
+
+                                this.currentTask = nextTask.get();
+                                taskContext.assignTask(this.currentTask);
+                                System.out.println("Submitting next task " + this.currentTask.getName() + " of process " + processId);
+                                submitTaskToQueue(this.currentTask.getTaskType().getTopicName(), taskContext);
+
+                            } else {
+                                System.out.println("Next task with id " + nextTaskId + " can not be found");
+                                updateProcessStatus(ProcessStatusResource.State.FAILED, "Next task with id "
+                                        + nextTaskId + " can not be found");
+                                return;
+                            }
+
+                        } else {
+                            System.out.println("Incomplete graph. Next outport " + nextOutPort.get().getName()
+                                    + " of task " + currentTask.getName() + " ends with a no endpoint");
+                            updateProcessStatus(ProcessStatusResource.State.FAILED, "Incomplete graph. Next outport "
+                                    + nextOutPort.get().getName() + " of task " + currentTask.getName()
+                                    + " ends with a no endpoint");
+                            return;
+                        }
                     } else {
-                        updateProcessStatus(ProcessStatusResource.State.COMPLETED);
+                        System.out.println("Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId());
+                        updateProcessStatus(ProcessStatusResource.State.FAILED,
+                                "Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId());
                     }
-                });
+                }
                 break;
             case TaskStatusResource.State.FAILED:
                 updateProcessStatus(ProcessStatusResource.State.FAILED);
@@ -82,14 +147,20 @@ public class ProcessLifeCycleManager {
         }
     }
 
-    public void submitTaskToQueue(TaskResource taskResource) {
+    private void submitTaskToQueue(String topicName, TaskContext taskContext) {
+        updateProcessStatus(ProcessStatusResource.State.MONITORING, "Submitting task " + taskContext.getTaskId() + " to queue");
+        kafkaSender.send(topicName, taskContext);
+    }
 
+    private void updateProcessStatus(ProcessStatusResource.State state) {
+        updateProcessStatus(state, "");
     }
 
-    private void updateProcessStatus(int state) {
+    private void updateProcessStatus(ProcessStatusResource.State state, String reason) {
         this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId + "/status",
                 new ProcessStatusResource()
-                        .setState(state)
+                        .setState(state.getValue())
+                        .setReason(reason)
                         .setTimeOfStateChange(System.currentTimeMillis()),
                 Long.class);
     }
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
index 43e7526..42aa43d 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
@@ -20,6 +20,7 @@
 package org.apache.airavata.k8s.gfac.messaging;
 
 import org.apache.airavata.k8s.gfac.service.WorkerService;
+import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.kafka.annotation.KafkaListener;
 
 import javax.annotation.Resource;
@@ -42,12 +43,8 @@ public class KafkaReceiver {
     }
 
     @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
-    public void receiveTaskEvent(String payload) {
-        System.out.println("received event=" + payload);
-        String[] eventParts = payload.split(",");
-        long processId = Long.parseLong(eventParts[0]);
-        long taskId = Long.parseLong(eventParts[1]);
-        int state = Integer.parseInt(eventParts[2]);
-        workerService.onTaskStateEvent(processId, taskId, state);
+    public void receiveTaskEvent(TaskContext taskContext) {
+        System.out.println("received event for task id =" + taskContext.getTaskId());
+        workerService.onTaskStateEvent(taskContext);
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
index f4afe30..c4df008 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.gfac.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.core.KafkaTemplate;
 
@@ -31,9 +33,9 @@ import org.springframework.kafka.core.KafkaTemplate;
 public class KafkaSender {
 
     @Autowired
-    private KafkaTemplate<String, String> kafkaTemplate;
+    private KafkaTemplate<String, TaskContext> kafkaTemplate;
 
-    public void send(String topic, String payload) {
-        kafkaTemplate.send(topic, payload);
+    public void send(String topic, TaskContext taskContext) {
+        kafkaTemplate.send(topic, taskContext);
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
index cb94135..0b23bdd 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.gfac.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.beans.factory.annotation.Value;
@@ -69,7 +71,7 @@ public class ReceiverConfig {
         // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
         // create a random group for each consumer in order to read all events form all consumers
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group-" + UUID.randomUUID().toString());
         return props;
@@ -81,8 +83,8 @@ public class ReceiverConfig {
     }
 
     @Bean
-    public ConsumerFactory<String, String> consumerFactoryForEvents() {
-        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigsForEvents());
+    public ConsumerFactory<String, TaskContext> consumerFactoryForEvents() {
+        return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigsForEvents());
     }
 
     @Bean
@@ -95,8 +97,8 @@ public class ReceiverConfig {
     }
 
     @Bean
-    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaEventListenerContainerFactory() {
-        ConcurrentKafkaListenerContainerFactory<String, String> factory =
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TaskContext>> kafkaEventListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactoryForEvents());
         return factory;
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
index 3bd5303..4c6bf1e 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.gfac.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
@@ -48,17 +50,17 @@ public class SenderConfig {
         // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
         return props;
     }
 
     @Bean
-    public ProducerFactory<String, String> producerFactory() {
-        return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
+    public ProducerFactory<String, TaskContext> producerFactory() {
+        return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
     }
 
     @Bean
-    public KafkaTemplate<String, String> kafkaTemplate() {
+    public KafkaTemplate<String, TaskContext> kafkaTemplate() {
         return new KafkaTemplate<>(producerFactory());
     }
 
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 0f20138..5449951 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -20,18 +20,17 @@
 package org.apache.airavata.k8s.gfac.service;
 
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager;
 import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * TODO: Class level comments please
@@ -59,30 +58,26 @@ public class WorkerService {
         ProcessResource processResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/process/" + processId,
                 ProcessResource.class);
         List<TaskResource> taskResources = processResource.getTasks();
-        boolean freshProcess = true;
-        for (TaskResource taskResource : taskResources) {
-            if (taskResource.getTaskStatus() != null && taskResource.getTaskStatus().size() > 0) {
-                // Already partially completed process. This happens when the task scheduler is killed while processing a process
-                TaskStatusResource lastStatusResource = taskResource.getTaskStatus().get(taskResource.getTaskStatus().size() - 1);
-                // TODO continue from last state
-                freshProcess = false;
-            } else {
-                // Fresh task
 
-            }
-        }
+        Set<TaskDagResource> takDagSet = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/dag/"
+                + processId, Set.class);
 
-        if (freshProcess) {
-            System.out.println("Starting to execute process " + processId);
-            ProcessLifeCycleManager manager = new ProcessLifeCycleManager(processId, taskResources, kafkaSender, restTemplate, apiServerUrl);
-            manager.init();
-            manager.submitTaskToQueue(taskResources.get(0));
-            processLifecycleStore.put(processId, manager);
-        }
+        final Map<Long, Long> edgeMap = new HashMap<>();
+        Optional.ofNullable(takDagSet)
+                .ifPresent(dags -> dags.forEach(dag ->
+                        edgeMap.put(dag.getSourceOutPort().getId(), dag.getTargetTask().getId())));
+
+        System.out.println("Starting to execute process " + processId);
+        ProcessLifeCycleManager manager =
+                new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+
+        manager.init();
+        manager.start();
+        processLifecycleStore.put(processId, manager);
     }
 
-    public void onTaskStateEvent(long processId, long taskId, int state) {
-        Optional.ofNullable(processLifecycleStore.get(processId))
-                .ifPresent(manager -> manager.onTaskStateChanged(taskId, state));
+    public void onTaskStateEvent(TaskContext taskContext) {
+        Optional.ofNullable(processLifecycleStore.get(taskContext.getProcessId()))
+                .ifPresent(manager -> manager.onTaskStateChanged(taskContext));
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
index c4aed73..c23a904 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
@@ -1,5 +1,5 @@
 server.port = 8195
 api.server.url = api-server.default.svc.cluster.local:8080
 scheduler.topic.name = airavata-scheduler
-scheduler.group.name = gfac
+scheduler.group.name = task-scheduler
 task.event.topic.name = airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
index 23c147e..2fdb337 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
@@ -53,12 +53,12 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
     @Override
     public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
 
-        taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskResource, CommandTaskInfo.COMMAND, false));
-        taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskResource, CommandTaskInfo.ARGUMENTS, true));
-        taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskResource, CommandTaskInfo.STD_OUT_PATH, false));
-        taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskResource, CommandTaskInfo.STD_ERR_PATH, false));
+        taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskContext, taskResource, CommandTaskInfo.COMMAND, false));
+        taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskContext, taskResource, CommandTaskInfo.ARGUMENTS, true));
+        taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_OUT_PATH, false));
+        taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_ERR_PATH, false));
 
-        String computeId = findInput(taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false);
+        String computeId = findInput(taskContext, taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false);
         taskContext.getLocalContext().put(CommandTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
                 + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
 
@@ -75,7 +75,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
             String stdErrPath = (String) taskContext.getLocalContext().get(CommandTaskInfo.STD_ERR_PATH);
             String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
 
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+            publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
 
             String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
 
@@ -84,17 +84,17 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
             ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
 
             if (executionResult.getExitStatus() == 0) {
-                publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED);
+                finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
             } else if (executionResult.getExitStatus() == -1) {
-                publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process didn't exit successfully");
+                publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process didn't exit successfully");
             } else {
-                publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
+                publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
             }
 
         } catch (Exception e) {
 
             e.printStackTrace();
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage());
+            publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
         }
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
index d1a8582..188693e 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
@@ -1,5 +1,5 @@
 server.port = 8491
 api.server.url = api-server.default.svc.cluster.local:8080
-task.group.name = job-submission
+task.group.name = command-execution
 task.event.topic.name = airavata-task-event
-task.read.topic.name = airavata-task-job-submission
\ No newline at end of file
+task.read.topic.name = airavata-command
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
index 8b83708..c3ed302 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
@@ -56,10 +56,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
     @Override
     public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
 
-        taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false));
-        taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskResource, DataCollectingTaskInfo.IDENTIFIER, false));
+        taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskContext, taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false));
+        taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskContext, taskResource, DataCollectingTaskInfo.IDENTIFIER, false));
 
-        String computeId = findInput(taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false);
+        String computeId = findInput(taskContext, taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false);
         taskContext.getLocalContext().put(DataCollectingTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
                 + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
 
@@ -73,7 +73,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
             String identifier = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.IDENTIFIER);
             String remoteSourcePath = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.REMOTE_SOURCE_PATH);
 
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+            publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
 
             String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
             System.out.println("Downloading " + remoteSourcePath + " to " + temporaryFile + " from compute resource "
@@ -94,12 +94,11 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
             getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + taskResource.getId()+ "/"
                             + identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
 
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(),
-                    TaskStatusResource.State.COMPLETED);
+            finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
 
         } catch (Exception e) {
             e.printStackTrace();
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage());
+            publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
 
         }
     }
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
index cb3fada..88de3a5 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
@@ -54,10 +54,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
     @Override
     public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
 
-        taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskResource, DATA_LOCATION_ID, false));
-        taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskResource, REMOTE_TARGET_PATH, false));
+        taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskContext, taskResource, DATA_LOCATION_ID, false));
+        taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskContext, taskResource, REMOTE_TARGET_PATH, false));
 
-        String computeId = findInput(taskResource, COMPUTE_RESOURCE, false);
+        String computeId = findInput(taskContext, taskResource, COMPUTE_RESOURCE, false);
         taskContext.getLocalContext().put(COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
                 + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
     }
@@ -70,14 +70,15 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
         ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(COMPUTE_RESOURCE);
 
         try {
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+            publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
             fetchComputeResourceOperation(computeResource).transferDataIn(dataLocationId, remoteTargetPath, "SCP");
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED);
+            finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
+
 
         } catch (Exception e) {
 
             e.printStackTrace();
-            publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED);
+            publishTaskStatus(taskContext, TaskStatusResource.State.FAILED);
         }
     }
 }
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
index 1f958a7..69837a6 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
@@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api;
 
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.task.TaskInputResource;
+import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
@@ -54,7 +55,7 @@ public abstract class AbstractTaskExecutionService {
         System.out.println("Executing task " + taskContext.getTaskId());
         TaskResource taskResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/" + taskContext.getTaskId(), TaskResource.class);
 
-        publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.SCHEDULED);
+        publishTaskStatus(taskContext, TaskStatusResource.State.SCHEDULED);
 
         this.executorService.execute(() -> {
             try {
@@ -80,7 +81,7 @@ public abstract class AbstractTaskExecutionService {
         return operations;
     }
 
-    public String findInput(TaskResource taskResource, String name, boolean optional) throws Exception {
+    public String findInput(TaskContext taskContext, TaskResource taskResource, String name, boolean optional) throws Exception {
 
         Optional<TaskInputResource> inputResource = taskResource.getInputs()
                 .stream()
@@ -92,7 +93,7 @@ public abstract class AbstractTaskExecutionService {
 
         } else {
             if (!optional) {
-                publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED,
+                publishTaskStatus(taskContext, TaskStatusResource.State.FAILED,
                         name + " is not available in inputs");
                 throw new Exception(name + " is not available in inputs");
             } else {
@@ -104,13 +105,26 @@ public abstract class AbstractTaskExecutionService {
     public abstract void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception;
     public abstract void executeTask(TaskResource taskResource, TaskContext taskContext);
 
-    public void publishTaskStatus(long processId, long taskId, int status) {
-        publishTaskStatus(processId, taskId, status, "");
+    public void publishTaskStatus(TaskContext taskContext, int status) {
+        publishTaskStatus(taskContext, status, "");
     }
 
-    public void publishTaskStatus(long processId, long taskId, int status, String reason) {
-        this.kafkaSender.send(this.taskEventPublishTopic, processId + "-" + taskId,
-                processId + "," + taskId + "," + status + "," + reason);
+    public void publishTaskStatus(TaskContext taskContext, int status, String reason) {
+        taskContext.setStatus(status);
+        taskContext.setReason(reason);
+
+        this.kafkaSender.send(this.taskEventPublishTopic, taskContext);
+    }
+
+    public void finishTaskExecution(TaskContext taskContext, TaskResource task, String outPortName, int status, String reason) throws Exception {
+        Optional<TaskOutPortResource> selectedOutPort = task.getOutPorts().stream().filter(outPort -> outPort.getName().equals(outPortName)).findFirst();
+        if (!selectedOutPort.isPresent()) {
+            throw new Exception("Selected out port " + outPortName + " does not exist in the task " + task.getName());
+        }
+
+        taskContext.setStatus(status);
+        taskContext.setReason(reason);
+        taskContext.setOutPortId(selectedOutPort.get().getId());
     }
 
     public RestTemplate getRestTemplate() {
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
index 2fdf8af..e94beab 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
@@ -1,5 +1,9 @@
 package org.apache.airavata.k8s.task.api;
 
+import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -9,12 +13,50 @@ import java.util.Map;
  * @author dimuthu
  * @since 1.0.0-SNAPSHOT
  */
-public class TaskContext {
+public class TaskContext implements Serializable {
 
+    private long processId;
     private long taskId;
+    private int status;
+    private String reason;
+
+    public long getOutPortId() {
+        return outPortId;
+    }
+
+    public TaskContext setOutPortId(long outPortId) {
+        this.outPortId = outPortId;
+        return this;
+    }
+
+    private long outPortId;
     private Map<String, String> contextVariableParams = new HashMap<>();
     private Map<String, String> contextDataParams = new HashMap<>();
-    private Map<String, Object> localContext = new HashMap<>();
+    private transient Map<String, Object> localContext = new HashMap<>();
+
+    private void resetStatus() {
+        setStatus(-1);
+        setReason("");
+        setOutPortId(-1);
+        setProcessId(-1);
+        setTaskId(-1);
+    }
+
+    public void assignTask(TaskResource taskResource) {
+        resetStatus();
+        setTaskId(taskResource.getId());
+        setProcessId(taskResource.getParentProcessId());
+        setStatus(TaskStatusResource.State.SCHEDULED);
+    }
+
+    public void resetPublicContext() {
+        this.contextVariableParams = new HashMap<>();
+        this.contextDataParams = new HashMap<>();
+    }
+
+    public void resetLocalContext() {
+        this.localContext = new HashMap<>();
+    }
 
     public long getTaskId() {
         return taskId;
@@ -51,4 +93,32 @@ public class TaskContext {
         this.localContext = localContext;
         return this;
     }
+
+    public long getProcessId() {
+        return processId;
+    }
+
+    public TaskContext setProcessId(long processId) {
+        this.processId = processId;
+        return this;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public TaskContext setStatus(int status) {
+        this.status = status;
+        return this;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+    public TaskContext setReason(String reason) {
+        this.reason = reason;
+        return this;
+    }
+
 }
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
index 524ce2e..b826d5b 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
@@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api;
 
 import org.apache.kafka.common.serialization.Deserializer;
 
+import java.io.*;
 import java.util.Map;
 
 /**
@@ -19,6 +20,24 @@ public class TaskContextDeserializer implements Deserializer<TaskContext> {
 
     @Override
     public TaskContext deserialize(String topic, byte[] data) {
+        ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        ObjectInput in = null;
+        try {
+            in = new ObjectInputStream(bis);
+            return(TaskContext)in.readObject();
+        } catch (IOException e) {
+            // ignore exception
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                if (in != null) {
+                    in.close();
+                }
+            } catch (IOException ex) {
+                // ignore close exception
+            }
+        }
         return null;
     }
 
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
index eb4c762..0edac4b 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
@@ -2,6 +2,10 @@ package org.apache.airavata.k8s.task.api;
 
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
 import java.util.Map;
 
 /**
@@ -18,7 +22,23 @@ public class TaskContextSerializer implements Serializer<TaskContext> {
 
     @Override
     public byte[] serialize(String topic, TaskContext data) {
-        return new byte[0];
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutput out = null;
+        try {
+            out = new ObjectOutputStream(bos);
+            out.writeObject(data);
+            out.flush();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            // ignore catch
+        } finally {
+            try {
+                bos.close();
+            } catch (IOException ex) {
+                // ignore close exception
+            }
+        }
+        return null;
     }
 
     @Override
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
index 307215f..b584833 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
@@ -19,6 +19,7 @@
  */
 package org.apache.airavata.k8s.task.api.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.core.KafkaTemplate;
 
@@ -31,13 +32,13 @@ import org.springframework.kafka.core.KafkaTemplate;
 public class KafkaSender {
 
     @Autowired
-    private KafkaTemplate<String, String> kafkaTemplate;
+    private KafkaTemplate<String, TaskContext> kafkaTemplate;
 
-    public void send(String topic, String payload) {
-        kafkaTemplate.send(topic, payload);
+    public void send(String topic, TaskContext taskContext) {
+        kafkaTemplate.send(topic, taskContext);
     }
 
-    public void send(String topic, String key, String payload) {
-        kafkaTemplate.send(topic, key, payload);
+    public void send(String topic, String key, TaskContext taskContext) {
+        kafkaTemplate.send(topic, key, taskContext);
     }
 }
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
index b078a79..8a09a4e 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.task.api.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.beans.factory.annotation.Value;
@@ -59,7 +61,7 @@ public class ReceiverConfig {
         // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
         // allows a pool of processes to divide the work of consuming and processing records
         props.put(ConsumerConfig.GROUP_ID_CONFIG, taskGroupName);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -67,13 +69,13 @@ public class ReceiverConfig {
     }
 
     @Bean
-    public ConsumerFactory<String, String> consumerFactory() {
-        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
+    public ConsumerFactory<String, TaskContext> consumerFactory() {
+        return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigs());
     }
 
     @Bean
-    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
-        ConcurrentKafkaListenerContainerFactory<String, String> factory =
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TaskContext>> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
         factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
index cd8b54b..e66e1fd 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.k8s.task.api.messaging;
 
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
@@ -48,17 +50,17 @@ public class SenderConfig {
         // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
         return props;
     }
 
     @Bean
-    public ProducerFactory<String, String> producerFactory() {
-        return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
+    public ProducerFactory<String, TaskContext> producerFactory() {
+        return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
     }
 
     @Bean
-    public KafkaTemplate<String, String> kafkaTemplate() {
+    public KafkaTemplate<String, TaskContext> kafkaTemplate() {
         return new KafkaTemplate<>(producerFactory());
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.