You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:13:47 UTC
[airavata-sandbox] 03/19: Updated tasks to accept TaskContexts as
inputs
This is an automated email from the ASF dual-hosted git repository.
smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git
commit 7ee67aa756c3961bafc9573cd2c6a6b2f60e9bae
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Wed Nov 8 17:06:59 2017 +0530
Updated tasks to accept TaskContexts as inputs
---
.../resources/process/ProcessStatusResource.java | 59 ++++++++---
.../k8s/api/resources/task/TaskDagResource.java | 40 ++++++++
.../api/resources/task/TaskOutPortResource.java | 10 ++
.../k8s/api/resources/task/TaskResource.java | 22 +++-
.../api/server/controller/ProcessController.java | 2 +
.../k8s/api/server/controller/TaskController.java | 7 ++
.../k8s/api/server/model/task/TaskModel.java | 11 ++
.../server/repository/task/TaskDAGRepository.java | 4 +
.../k8s/api/server/service/WorkflowService.java | 12 ++-
.../k8s/api/server/service/task/TaskService.java | 17 +++-
.../k8s/api/server/service/util/GraphParser.java | 4 +-
.../api/server/service/util/ToResourceUtil.java | 42 ++++++--
.../src/main/resources/application.properties | 3 +-
.../modules/microservices/task-scheduler/pom.xml | 5 +
.../k8s/gfac/core/ProcessLifeCycleManager.java | 113 +++++++++++++++++----
.../airavata/k8s/gfac/messaging/KafkaReceiver.java | 11 +-
.../airavata/k8s/gfac/messaging/KafkaSender.java | 8 +-
.../k8s/gfac/messaging/ReceiverConfig.java | 12 ++-
.../airavata/k8s/gfac/messaging/SenderConfig.java | 10 +-
.../airavata/k8s/gfac/service/WorkerService.java | 45 ++++----
.../src/main/resources/application.properties | 2 +-
.../k8s/task/job/service/TaskExecutionService.java | 20 ++--
.../src/main/resources/application.properties | 4 +-
.../task/egress/service/TaskExecutionService.java | 13 ++-
.../task/ingress/service/TaskExecutionService.java | 13 +--
.../k8s/task/api/AbstractTaskExecutionService.java | 30 ++++--
.../apache/airavata/k8s/task/api/TaskContext.java | 74 +++++++++++++-
.../k8s/task/api/TaskContextDeserializer.java | 19 ++++
.../k8s/task/api/TaskContextSerializer.java | 22 +++-
.../k8s/task/api/messaging/KafkaSender.java | 11 +-
.../k8s/task/api/messaging/ReceiverConfig.java | 12 ++-
.../k8s/task/api/messaging/SenderConfig.java | 10 +-
32 files changed, 518 insertions(+), 149 deletions(-)
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
index cfaa212..40fc9b1 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java
@@ -19,6 +19,9 @@
*/
package org.apache.airavata.k8s.api.resources.process;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* TODO: Class level comments please
*
@@ -88,20 +91,46 @@ public class ProcessStatusResource {
return this;
}
- public static final class State {
- public static final int CREATED = 0;
- public static final int VALIDATED = 1;
- public static final int STARTED = 2;
- public static final int PRE_PROCESSING = 3;
- public static final int CONFIGURING_WORKSPACE = 4;
- public static final int INPUT_DATA_STAGING = 5;
- public static final int EXECUTING = 6;
- public static final int MONITORING = 7;
- public static final int OUTPUT_DATA_STAGING = 8;
- public static final int POST_PROCESSING = 9;
- public static final int COMPLETED = 10;
- public static final int FAILED = 11;
- public static final int CANCELLING = 12;
- public static final int CANCELED = 13;
+ public static enum State {
+
+ CREATED(0),
+ VALIDATED(1),
+ STARTED(2),
+ PRE_PROCESSING(3),
+ CONFIGURING_WORKSPACE(4),
+ INPUT_DATA_STAGING(5),
+ EXECUTING(6),
+ MONITORING(7),
+ OUTPUT_DATA_STAGING(8),
+ POST_PROCESSING(9),
+ COMPLETED(10),
+ FAILED(11),
+ CANCELLING(12),
+ CANCELED(13);
+
+ private final int value;
+
+ private State(int value) {
+ this.value = value;
+ }
+
+ private static Map<Integer, State> map = new HashMap<>();
+
+ static {
+ for (State state : State.values()) {
+ map.put(state.value, state);
+ }
+ }
+
+ public static State valueOf(int taskState) {
+ return map.get(taskState);
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
}
}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java
new file mode 100644
index 0000000..bfef611
--- /dev/null
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.k8s.api.resources.task;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskDagResource {
+ private long id;
+ private TaskOutPortResource sourceOutPort;
+ private TaskResource targetTask;
+
+ public long getId() {
+ return id;
+ }
+
+ public TaskDagResource setId(long id) {
+ this.id = id;
+ return this;
+ }
+
+ public TaskOutPortResource getSourceOutPort() {
+ return sourceOutPort;
+ }
+
+ public TaskDagResource setSourceOutPort(TaskOutPortResource sourceOutPort) {
+ this.sourceOutPort = sourceOutPort;
+ return this;
+ }
+
+ public TaskResource getTargetTask() {
+ return targetTask;
+ }
+
+ public TaskDagResource setTargetTask(TaskResource targetTask) {
+ this.targetTask = targetTask;
+ return this;
+ }
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
index de4ef1b..2ea8b3f 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java
@@ -10,6 +10,7 @@ public class TaskOutPortResource {
private long id;
private String name;
private int referenceId = 0;
+ private TaskResource taskResource;
public long getId() {
return id;
@@ -37,4 +38,13 @@ public class TaskOutPortResource {
this.referenceId = referenceId;
return this;
}
+
+ public TaskResource getTaskResource() {
+ return taskResource;
+ }
+
+ public TaskOutPortResource setTaskResource(TaskResource taskResource) {
+ this.taskResource = taskResource;
+ return this;
+ }
}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
index bdc34d7..bb54ceb 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.api.resources.task;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+
import java.util.ArrayList;
import java.util.List;
@@ -31,8 +33,9 @@ import java.util.List;
public class TaskResource {
private long id;
+ private int referenceId; // for workflows
private String name;
- private long taskTypeId;
+ private TaskTypeResource taskType;
private String taskTypeStr;
private long parentProcessId;
private long creationTime;
@@ -57,12 +60,12 @@ public class TaskResource {
return this;
}
- public long getTaskTypeId() {
- return taskTypeId;
+ public TaskTypeResource getTaskType() {
+ return taskType;
}
- public TaskResource setTaskTypeId(long taskTypeId) {
- this.taskTypeId = taskTypeId;
+ public TaskResource setTaskType(TaskTypeResource taskType) {
+ this.taskType = taskType;
return this;
}
@@ -199,6 +202,15 @@ public class TaskResource {
return this;
}
+ public int getReferenceId() {
+ return referenceId;
+ }
+
+ public TaskResource setReferenceId(int referenceId) {
+ this.referenceId = referenceId;
+ return this;
+ }
+
public static final class TaskTypes {
public static final int ENV_SETUP = 0;
public static final int INGRESS_DATA_STAGING = 1;
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
index 7dffe68..93a530c 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java
@@ -20,6 +20,7 @@
package org.apache.airavata.k8s.api.server.controller;
import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.server.service.ProcessService;
@@ -27,6 +28,7 @@ import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
+import java.util.Set;
/**
* TODO: Class level comments please
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
index 2fb2334..c72ba47 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.k8s.api.server.controller;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -27,6 +28,7 @@ import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
+import java.util.Set;
/**
* TODO: Class level comments please
@@ -62,4 +64,9 @@ public class TaskController {
return this.taskService.findTaskStatusById(id)
.orElseThrow(() -> new ServerRuntimeException("Task status with id " + id + " not found"));
}
+
+ @GetMapping(path = "dag/{process_id}", produces = MediaType.APPLICATION_JSON_VALUE)
+ public Set<TaskDagResource> getDagForProcess(@PathVariable("process_id") long processId) {
+ return this.taskService.getDagForProcess(processId);
+ }
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
index 247bae1..4006e57 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java
@@ -42,6 +42,8 @@ public class TaskModel {
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;
+ private int referenceId; // to track workflows
+
private String name;
@ManyToOne
@@ -211,4 +213,13 @@ public class TaskModel {
public void setTaskOutPorts(List<TaskOutPort> taskOutPorts) {
this.taskOutPorts = taskOutPorts;
}
+
+ public int getReferenceId() {
+ return referenceId;
+ }
+
+ public TaskModel setReferenceId(int referenceId) {
+ this.referenceId = referenceId;
+ return this;
+ }
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
index b2cb7fa..dc64fc0 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java
@@ -3,6 +3,9 @@ package org.apache.airavata.k8s.api.server.repository.task;
import org.apache.airavata.k8s.api.server.model.task.TaskDAG;
import org.springframework.data.repository.CrudRepository;
+import java.util.Iterator;
+import java.util.Optional;
+
/**
* TODO: Class level comments please
*
@@ -10,4 +13,5 @@ import org.springframework.data.repository.CrudRepository;
* @since 1.0.0-SNAPSHOT
*/
public interface TaskDAGRepository extends CrudRepository<TaskDAG, Long> {
+ public Iterable<TaskDAG> findBysourceOutPort_taskModel_parentProcess_id(long processId);
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 6090c90..68abab4 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -3,7 +3,6 @@ package org.apache.airavata.k8s.api.server.service;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
-import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
import org.apache.airavata.k8s.api.server.model.task.TaskDAG;
import org.apache.airavata.k8s.api.server.model.task.TaskModel;
import org.apache.airavata.k8s.api.server.model.task.TaskOutPort;
@@ -12,9 +11,11 @@ import org.apache.airavata.k8s.api.server.repository.task.TaskDAGRepository;
import org.apache.airavata.k8s.api.server.repository.task.TaskOutPortRepository;
import org.apache.airavata.k8s.api.server.repository.task.TaskRepository;
import org.apache.airavata.k8s.api.server.repository.workflow.WorkflowRepository;
+import org.apache.airavata.k8s.api.server.service.messaging.MessagingService;
import org.apache.airavata.k8s.api.server.service.task.TaskService;
import org.apache.airavata.k8s.api.server.service.util.GraphParser;
import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@@ -33,14 +34,19 @@ public class WorkflowService {
private ProcessService processService;
private TaskService taskService;
+ private MessagingService messagingService;
private WorkflowRepository workflowRepository;
private TaskOutPortRepository taskOutPortRepository;
private TaskRepository taskRepository;
private TaskDAGRepository taskDAGRepository;
+ @Value("${scheduler.topic.name}")
+ private String schedulerTopic;
+
public WorkflowService(ProcessService processService,
TaskService taskService,
+ MessagingService messagingService,
WorkflowRepository workflowRepository,
TaskOutPortRepository taskOutPortRepository,
TaskRepository taskRepository,
@@ -48,6 +54,7 @@ public class WorkflowService {
this.processService = processService;
this.taskService = taskService;
+ this.messagingService = messagingService;
this.workflowRepository = workflowRepository;
this.taskOutPortRepository = taskOutPortRepository;
this.taskRepository = taskRepository;
@@ -112,6 +119,8 @@ public class WorkflowService {
} catch (Exception e) {
throw new ServerRuntimeException("Failed to create workflow", e);
}
+
+ this.messagingService.send(schedulerTopic, processId + "");
return 0;
}
@@ -123,6 +132,7 @@ public class WorkflowService {
return workflowResources;
}
+ @SuppressWarnings("WeakerAccess")
public Optional<Workflow> findEntityById(long id) {
return this.workflowRepository.findById(id);
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
index d1b3828..9df55ab 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.k8s.api.server.service.task;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -29,8 +30,7 @@ import org.apache.airavata.k8s.api.server.repository.task.type.TaskTypeRepositor
import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
import org.springframework.stereotype.Service;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
/**
* TODO: Class level comments please
@@ -79,12 +79,13 @@ public class TaskService {
taskModel.setStartingTask(resource.isStartingTask());
taskModel.setStoppingTask(resource.isStoppingTask());
taskModel.setTaskDetail(resource.getTaskDetail());
+ taskModel.setReferenceId(resource.getReferenceId());
taskModel.setParentProcess(processRepository.findById(resource.getParentProcessId())
.orElseThrow(() -> new ServerRuntimeException("Can not find process with id " +
resource.getParentProcessId())));
- taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskTypeId())
+ taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskType().getId())
.orElseThrow(() -> new ServerRuntimeException("Can not find task type with id " +
- resource.getTaskTypeId())));
+ resource.getTaskType().getId())));
TaskModel savedTask = taskRepository.save(taskModel);
@@ -141,4 +142,12 @@ public class TaskService {
return ToResourceUtil.toResource(taskRepository.findById(id).get());
}
+ public Set<TaskDagResource> getDagForProcess(long processId) {
+ Set<TaskDagResource> taskDagResources = new HashSet<>();
+ Iterable<TaskDAG> taskDags = this.taskDAGRepository.findBysourceOutPort_taskModel_parentProcess_id(processId);
+ Optional.ofNullable(taskDags).ifPresent(dags -> dags.forEach(taskDAG -> {
+ taskDagResources.add(ToResourceUtil.toResource(taskDAG).get());
+ }));
+ return taskDagResources;
+ }
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
index 9cf1630..9f919f7 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java
@@ -4,6 +4,7 @@ import org.apache.airavata.k8s.api.resources.task.TaskInputResource;
import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
import org.apache.airavata.k8s.api.resources.task.TaskOutputResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.w3c.dom.*;
import org.xml.sax.InputSource;
@@ -55,12 +56,13 @@ public class GraphParser {
taskResource.setName(attr.getNodeValue());
} else if ("Type".equals(attr.getNodeName())) {
- taskResource.setTaskTypeId(Long.parseLong(attr.getNodeValue()));
+ taskResource.setTaskType(new TaskTypeResource().setId(Long.parseLong(attr.getNodeValue())));
} else if (attr.getNodeName().startsWith("in-") || attr.getNodeName().startsWith("out-")) {
} else if ("id".equals(attr.getNodeName())) {
id = Integer.parseInt(attr.getNodeValue());
+ taskResource.setReferenceId(id);
} else if (attr.getNodeName().startsWith("output-")) {
TaskOutputResource outputResource = new TaskOutputResource();
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index 7631b01..acd94c9 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -37,16 +37,13 @@ import org.apache.airavata.k8s.api.server.model.experiment.ExperimentOutputData;
import org.apache.airavata.k8s.api.server.model.experiment.ExperimentStatus;
import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
-import org.apache.airavata.k8s.api.server.model.task.TaskInput;
-import org.apache.airavata.k8s.api.server.model.task.TaskModel;
+import org.apache.airavata.k8s.api.server.model.task.*;
import org.apache.airavata.k8s.api.resources.application.*;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.experiment.ExperimentInputResource;
import org.apache.airavata.k8s.api.resources.experiment.ExperimentOutputResource;
import org.apache.airavata.k8s.api.resources.experiment.ExperimentResource;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
-import org.apache.airavata.k8s.api.server.model.task.TaskOutput;
-import org.apache.airavata.k8s.api.server.model.task.TaskStatus;
import org.apache.airavata.k8s.api.server.model.task.type.TaskInputType;
import org.apache.airavata.k8s.api.server.model.task.type.TaskModelType;
import org.apache.airavata.k8s.api.server.model.task.type.TaskOutPortType;
@@ -231,10 +228,11 @@ public class ToResourceUtil {
resource.setLastUpdateTime(taskModel.getLastUpdateTime());
resource.setCreationTime(taskModel.getCreationTime());
resource.setParentProcessId(taskModel.getParentProcess().getId());
- resource.setTaskTypeId(taskModel.getTaskType().getId());
+ resource.setTaskType(toResource(taskModel.getTaskType()).get());
resource.setTaskDetail(taskModel.getTaskDetail());
resource.setStartingTask(taskModel.isStartingTask());
resource.setStoppingTask(taskModel.isStoppingTask());
+ resource.setReferenceId(taskModel.getReferenceId());
Optional.ofNullable(taskModel.getTaskInputs())
.ifPresent(inputs ->
inputs.forEach(input -> resource.getInputs()
@@ -306,7 +304,14 @@ public class ToResourceUtil {
ProcessResource processResource = new ProcessResource();
processResource.setId(processModel.getId());
processResource.setLastUpdateTime(processModel.getLastUpdateTime());
- processResource.setExperimentId(processModel.getExperiment().getId());
+
+ Optional.ofNullable(processModel.getExperiment()).ifPresent(experiment -> {
+ processResource.setExperimentId(experiment.getId());
+ });
+ Optional.ofNullable(processModel.getWorkflow()).ifPresent(workflow -> {
+ processResource.setWorkflowId(workflow.getId());
+ });
+
processResource.setTaskDag(processModel.getTaskDag());
processResource.setCreationTime(processModel.getCreationTime());
Optional.ofNullable(processModel.getProcessStatuses())
@@ -418,4 +423,29 @@ public class ToResourceUtil {
return Optional.empty();
}
}
+
+ public static Optional<TaskDagResource> toResource(TaskDAG taskDAG) {
+ if (taskDAG != null) {
+ TaskDagResource resource = new TaskDagResource();
+ resource.setId(taskDAG.getId());
+ resource.setSourceOutPort(toResource(taskDAG.getSourceOutPort()).get());
+ resource.setTargetTask(toResource(taskDAG.getTargetTask()).get());
+ return Optional.of(resource);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static Optional<TaskOutPortResource> toResource(TaskOutPort outPort) {
+ if (outPort != null) {
+ TaskOutPortResource resource = new TaskOutPortResource();
+ resource.setId(outPort.getId());
+ resource.setReferenceId(outPort.getReferenceId());
+ resource.setName(outPort.getName());
+ resource.setTaskResource(toResource(outPort.getTaskModel()).get());
+ return Optional.of(resource);
+ } else {
+ return Optional.empty();
+ }
+ }
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
index 6f76911..bdcb9bc 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties
@@ -2,4 +2,5 @@ spring.jpa.hibernate.ddl-auto=update
spring.datasource.url=jdbc:mysql://db.default.svc.cluster.local:3306/airavata
spring.datasource.username=root
spring.datasource.password=fun123
-launch.topic.name = airavata-launch
\ No newline at end of file
+launch.topic.name = airavata-launch
+scheduler.topic.name = airavata-scheduler
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
index d2dca7c..6372cb1 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
@@ -39,6 +39,11 @@
<artifactId>api-resource</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
index 751b2ec..8508f92 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
@@ -1,4 +1,4 @@
-/**
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,9 +20,11 @@
package org.apache.airavata.k8s.gfac.core;
import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.airavata.k8s.task.api.TaskContext;
import org.springframework.web.client.RestTemplate;
import java.util.*;
@@ -36,45 +38,108 @@ import java.util.*;
public class ProcessLifeCycleManager {
private long processId;
- private List<TaskResource> taskDag;
- private Map<Long, Integer> taskPoint;
+ private List<TaskResource> tasks;
+ private TaskResource currentTask;
+ private Map<Long, Long> edgeMap;
+
private KafkaSender kafkaSender;
// Todo abstract out these parameters to reusable class
private final RestTemplate restTemplate;
private String apiServerUrl;
- public ProcessLifeCycleManager(long processId, List<TaskResource> tasks,
+ public ProcessLifeCycleManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
KafkaSender kafkaSender,
RestTemplate restTemplate, String apiServerUrl) {
this.processId = processId;
- this.taskDag = tasks;
+ this.tasks = tasks;
+ this.edgeMap = edgeMap;
this.kafkaSender = kafkaSender;
this.restTemplate = restTemplate;
this.apiServerUrl = apiServerUrl;
}
public void init() {
- taskDag.sort(Comparator.comparing(TaskResource::getOrder));
- taskPoint = new HashMap<>();
- for (int i = 0; i < taskDag.size(); i++) {
- taskPoint.put(taskDag.get(i).getId(), i);
+
+ Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
+ if (startingTask.isPresent()) {
+ this.currentTask = startingTask.get();
+ } else {
+ System.out.println("No starting task for this process " + processId);
+ updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for this process");
}
+
+ }
+
+ public void start() {
updateProcessStatus(ProcessStatusResource.State.EXECUTING);
+ System.out.println("Starting process " + processId + " with task " + currentTask.getName());
+
+ TaskContext startContext = new TaskContext();
+ startContext.assignTask(currentTask);
+
+ submitTaskToQueue(currentTask.getTaskType().getTopicName(), startContext);
}
- public synchronized void onTaskStateChanged(long taskId, int state) {
- switch (state) {
+ public synchronized void onTaskStateChanged(TaskContext taskContext) {
+
+ updateProcessStatus(ProcessStatusResource.State.MONITORING, "Task moved to state "
+ + ProcessStatusResource.State.valueOf(taskContext.getStatus()).name());
+
+ if (taskContext.getTaskId() != currentTask.getId()) {
+ System.out.println("Incompatible task status received. " +
+ "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId());
+ updateProcessStatus(ProcessStatusResource.State.FAILED, "Incompatible task status received. " +
+ "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId());
+ return;
+ } else {
+ System.out.println("Compatible task status received");
+ }
+
+ switch (taskContext.getStatus()) {
case TaskStatusResource.State.COMPLETED:
- System.out.println("Task " + taskId + " was completed");
- Optional.ofNullable(this.taskPoint.get(taskId)).ifPresent(point -> {
- if (point + 1 < taskDag.size()) {
- TaskResource resource = taskDag.get(point + 1);
- submitTaskToQueue(resource);
+
+ if (currentTask.isStoppingTask()) {
+ System.out.println("Process completed with last task " + currentTask.getName());
+ updateProcessStatus(ProcessStatusResource.State.COMPLETED, "Process completed with last task " + currentTask.getName());
+
+ } else {
+ Optional<TaskOutPortResource> nextOutPort = currentTask.getOutPorts().stream()
+ .filter(port -> port.getId() == taskContext.getOutPortId()).findFirst();
+ if (nextOutPort.isPresent()) {
+
+ if (edgeMap.containsKey(nextOutPort.get().getId())) {
+ Long nextTaskId = edgeMap.get(nextOutPort.get().getId());
+ Optional<TaskResource> nextTask = tasks.stream().filter(task -> task.getId() == nextTaskId).findFirst();
+
+ if (nextTask.isPresent()) {
+
+ this.currentTask = nextTask.get();
+ taskContext.assignTask(this.currentTask);
+ System.out.println("Submitting next task " + this.currentTask.getName() + " of process " + processId);
+ submitTaskToQueue(this.currentTask.getTaskType().getTopicName(), taskContext);
+
+ } else {
+ System.out.println("Next task with id " + nextTaskId + " can not be found");
+ updateProcessStatus(ProcessStatusResource.State.FAILED, "Next task with id "
+ + nextTaskId + " can not be found");
+ return;
+ }
+
+ } else {
+ System.out.println("Incomplete graph. Next outport " + nextOutPort.get().getName()
+ + " of task " + currentTask.getName() + " ends with a no endpoint");
+ updateProcessStatus(ProcessStatusResource.State.FAILED, "Incomplete graph. Next outport "
+ + nextOutPort.get().getName() + " of task " + currentTask.getName()
+ + " ends with a no endpoint");
+ return;
+ }
} else {
- updateProcessStatus(ProcessStatusResource.State.COMPLETED);
+ System.out.println("Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId());
+ updateProcessStatus(ProcessStatusResource.State.FAILED,
+ "Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId());
}
- });
+ }
break;
case TaskStatusResource.State.FAILED:
updateProcessStatus(ProcessStatusResource.State.FAILED);
@@ -82,14 +147,20 @@ public class ProcessLifeCycleManager {
}
}
- public void submitTaskToQueue(TaskResource taskResource) {
+ private void submitTaskToQueue(String topicName, TaskContext taskContext) {
+ updateProcessStatus(ProcessStatusResource.State.MONITORING, "Submitting task " + taskContext.getTaskId() + " to queue");
+ kafkaSender.send(topicName, taskContext);
+ }
+ private void updateProcessStatus(ProcessStatusResource.State state) {
+ updateProcessStatus(state, "");
}
- private void updateProcessStatus(int state) {
+ private void updateProcessStatus(ProcessStatusResource.State state, String reason) {
this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId + "/status",
new ProcessStatusResource()
- .setState(state)
+ .setState(state.getValue())
+ .setReason(reason)
.setTimeOfStateChange(System.currentTimeMillis()),
Long.class);
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
index 43e7526..42aa43d 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
@@ -20,6 +20,7 @@
package org.apache.airavata.k8s.gfac.messaging;
import org.apache.airavata.k8s.gfac.service.WorkerService;
+import org.apache.airavata.k8s.task.api.TaskContext;
import org.springframework.kafka.annotation.KafkaListener;
import javax.annotation.Resource;
@@ -42,12 +43,8 @@ public class KafkaReceiver {
}
@KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
- public void receiveTaskEvent(String payload) {
- System.out.println("received event=" + payload);
- String[] eventParts = payload.split(",");
- long processId = Long.parseLong(eventParts[0]);
- long taskId = Long.parseLong(eventParts[1]);
- int state = Integer.parseInt(eventParts[2]);
- workerService.onTaskStateEvent(processId, taskId, state);
+ public void receiveTaskEvent(TaskContext taskContext) {
+ System.out.println("received event for task id =" + taskContext.getTaskId());
+ workerService.onTaskStateEvent(taskContext);
}
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
index f4afe30..c4df008 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.gfac.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
@@ -31,9 +33,9 @@ import org.springframework.kafka.core.KafkaTemplate;
public class KafkaSender {
@Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
+ private KafkaTemplate<String, TaskContext> kafkaTemplate;
- public void send(String topic, String payload) {
- kafkaTemplate.send(topic, payload);
+ public void send(String topic, TaskContext taskContext) {
+ kafkaTemplate.send(topic, taskContext);
}
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
index cb94135..0b23bdd 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.gfac.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
@@ -69,7 +71,7 @@ public class ReceiverConfig {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
// create a random group for each consumer in order to read all events form all consumers
props.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group-" + UUID.randomUUID().toString());
return props;
@@ -81,8 +83,8 @@ public class ReceiverConfig {
}
@Bean
- public ConsumerFactory<String, String> consumerFactoryForEvents() {
- return new DefaultKafkaConsumerFactory<String, String>(consumerConfigsForEvents());
+ public ConsumerFactory<String, TaskContext> consumerFactoryForEvents() {
+ return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigsForEvents());
}
@Bean
@@ -95,8 +97,8 @@ public class ReceiverConfig {
}
@Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaEventListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TaskContext>> kafkaEventListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryForEvents());
return factory;
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
index 3bd5303..4c6bf1e 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.gfac.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
@@ -48,17 +50,17 @@ public class SenderConfig {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
return props;
}
@Bean
- public ProducerFactory<String, String> producerFactory() {
- return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
+ public ProducerFactory<String, TaskContext> producerFactory() {
+ return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
}
@Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
+ public KafkaTemplate<String, TaskContext> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 0f20138..5449951 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -20,18 +20,17 @@
package org.apache.airavata.k8s.gfac.service;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
+import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager;
import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.airavata.k8s.task.api.TaskContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
/**
* TODO: Class level comments please
@@ -59,30 +58,26 @@ public class WorkerService {
ProcessResource processResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/process/" + processId,
ProcessResource.class);
List<TaskResource> taskResources = processResource.getTasks();
- boolean freshProcess = true;
- for (TaskResource taskResource : taskResources) {
- if (taskResource.getTaskStatus() != null && taskResource.getTaskStatus().size() > 0) {
- // Already partially completed process. This happens when the task scheduler is killed while processing a process
- TaskStatusResource lastStatusResource = taskResource.getTaskStatus().get(taskResource.getTaskStatus().size() - 1);
- // TODO continue from last state
- freshProcess = false;
- } else {
- // Fresh task
- }
- }
+ Set<TaskDagResource> takDagSet = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/dag/"
+ + processId, Set.class);
- if (freshProcess) {
- System.out.println("Starting to execute process " + processId);
- ProcessLifeCycleManager manager = new ProcessLifeCycleManager(processId, taskResources, kafkaSender, restTemplate, apiServerUrl);
- manager.init();
- manager.submitTaskToQueue(taskResources.get(0));
- processLifecycleStore.put(processId, manager);
- }
+ final Map<Long, Long> edgeMap = new HashMap<>();
+ Optional.ofNullable(takDagSet)
+ .ifPresent(dags -> dags.forEach(dag ->
+ edgeMap.put(dag.getSourceOutPort().getId(), dag.getTargetTask().getId())));
+
+ System.out.println("Starting to execute process " + processId);
+ ProcessLifeCycleManager manager =
+ new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+
+ manager.init();
+ manager.start();
+ processLifecycleStore.put(processId, manager);
}
- public void onTaskStateEvent(long processId, long taskId, int state) {
- Optional.ofNullable(processLifecycleStore.get(processId))
- .ifPresent(manager -> manager.onTaskStateChanged(taskId, state));
+ public void onTaskStateEvent(TaskContext taskContext) {
+ Optional.ofNullable(processLifecycleStore.get(taskContext.getProcessId()))
+ .ifPresent(manager -> manager.onTaskStateChanged(taskContext));
}
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
index c4aed73..c23a904 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
@@ -1,5 +1,5 @@
server.port = 8195
api.server.url = api-server.default.svc.cluster.local:8080
scheduler.topic.name = airavata-scheduler
-scheduler.group.name = gfac
+scheduler.group.name = task-scheduler
task.event.topic.name = airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
index 23c147e..2fdb337 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
@@ -53,12 +53,12 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
@Override
public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
- taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskResource, CommandTaskInfo.COMMAND, false));
- taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskResource, CommandTaskInfo.ARGUMENTS, true));
- taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskResource, CommandTaskInfo.STD_OUT_PATH, false));
- taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskResource, CommandTaskInfo.STD_ERR_PATH, false));
+ taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskContext, taskResource, CommandTaskInfo.COMMAND, false));
+ taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskContext, taskResource, CommandTaskInfo.ARGUMENTS, true));
+ taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_OUT_PATH, false));
+ taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_ERR_PATH, false));
- String computeId = findInput(taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false);
+ String computeId = findInput(taskContext, taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false);
taskContext.getLocalContext().put(CommandTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(computeId), ComputeResource.class));
@@ -75,7 +75,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
String stdErrPath = (String) taskContext.getLocalContext().get(CommandTaskInfo.STD_ERR_PATH);
String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+ publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
@@ -84,17 +84,17 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
if (executionResult.getExitStatus() == 0) {
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED);
+ finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
} else if (executionResult.getExitStatus() == -1) {
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process didn't exit successfully");
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process didn't exit successfully");
} else {
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
}
} catch (Exception e) {
e.printStackTrace();
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage());
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
}
}
}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
index d1a8582..188693e 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
@@ -1,5 +1,5 @@
server.port = 8491
api.server.url = api-server.default.svc.cluster.local:8080
-task.group.name = job-submission
+task.group.name = command-execution
task.event.topic.name = airavata-task-event
-task.read.topic.name = airavata-task-job-submission
\ No newline at end of file
+task.read.topic.name = airavata-command
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
index 8b83708..c3ed302 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
@@ -56,10 +56,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
@Override
public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
- taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false));
- taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskResource, DataCollectingTaskInfo.IDENTIFIER, false));
+ taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskContext, taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false));
+ taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskContext, taskResource, DataCollectingTaskInfo.IDENTIFIER, false));
- String computeId = findInput(taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false);
+ String computeId = findInput(taskContext, taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false);
taskContext.getLocalContext().put(DataCollectingTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(computeId), ComputeResource.class));
@@ -73,7 +73,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
String identifier = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.IDENTIFIER);
String remoteSourcePath = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.REMOTE_SOURCE_PATH);
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+ publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
System.out.println("Downloading " + remoteSourcePath + " to " + temporaryFile + " from compute resource "
@@ -94,12 +94,11 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + taskResource.getId()+ "/"
+ identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(),
- TaskStatusResource.State.COMPLETED);
+ finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
} catch (Exception e) {
e.printStackTrace();
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage());
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
}
}
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
index cb3fada..88de3a5 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
@@ -54,10 +54,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
@Override
public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
- taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskResource, DATA_LOCATION_ID, false));
- taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskResource, REMOTE_TARGET_PATH, false));
+ taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskContext, taskResource, DATA_LOCATION_ID, false));
+ taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskContext, taskResource, REMOTE_TARGET_PATH, false));
- String computeId = findInput(taskResource, COMPUTE_RESOURCE, false);
+ String computeId = findInput(taskContext, taskResource, COMPUTE_RESOURCE, false);
taskContext.getLocalContext().put(COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(computeId), ComputeResource.class));
}
@@ -70,14 +70,15 @@ public class TaskExecutionService extends AbstractTaskExecutionService {
ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(COMPUTE_RESOURCE);
try {
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING);
+ publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
fetchComputeResourceOperation(computeResource).transferDataIn(dataLocationId, remoteTargetPath, "SCP");
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED);
+ finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
+
} catch (Exception e) {
e.printStackTrace();
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED);
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED);
}
}
}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
index 1f958a7..69837a6 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
@@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskInputResource;
+import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
@@ -54,7 +55,7 @@ public abstract class AbstractTaskExecutionService {
System.out.println("Executing task " + taskContext.getTaskId());
TaskResource taskResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/" + taskContext.getTaskId(), TaskResource.class);
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.SCHEDULED);
+ publishTaskStatus(taskContext, TaskStatusResource.State.SCHEDULED);
this.executorService.execute(() -> {
try {
@@ -80,7 +81,7 @@ public abstract class AbstractTaskExecutionService {
return operations;
}
- public String findInput(TaskResource taskResource, String name, boolean optional) throws Exception {
+ public String findInput(TaskContext taskContext, TaskResource taskResource, String name, boolean optional) throws Exception {
Optional<TaskInputResource> inputResource = taskResource.getInputs()
.stream()
@@ -92,7 +93,7 @@ public abstract class AbstractTaskExecutionService {
} else {
if (!optional) {
- publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED,
+ publishTaskStatus(taskContext, TaskStatusResource.State.FAILED,
name + " is not available in inputs");
throw new Exception(name + " is not available in inputs");
} else {
@@ -104,13 +105,26 @@ public abstract class AbstractTaskExecutionService {
public abstract void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception;
public abstract void executeTask(TaskResource taskResource, TaskContext taskContext);
- public void publishTaskStatus(long processId, long taskId, int status) {
- publishTaskStatus(processId, taskId, status, "");
+ public void publishTaskStatus(TaskContext taskContext, int status) {
+ publishTaskStatus(taskContext, status, "");
}
- public void publishTaskStatus(long processId, long taskId, int status, String reason) {
- this.kafkaSender.send(this.taskEventPublishTopic, processId + "-" + taskId,
- processId + "," + taskId + "," + status + "," + reason);
+ public void publishTaskStatus(TaskContext taskContext, int status, String reason) {
+ taskContext.setStatus(status);
+ taskContext.setReason(reason);
+
+ this.kafkaSender.send(this.taskEventPublishTopic, taskContext);
+ }
+
+ public void finishTaskExecution(TaskContext taskContext, TaskResource task, String outPortName, int status, String reason) throws Exception {
+ Optional<TaskOutPortResource> selectedOutPort = task.getOutPorts().stream().filter(outPort -> outPort.getName().equals(outPortName)).findFirst();
+ if (!selectedOutPort.isPresent()) {
+ throw new Exception("Selected out port " + outPortName + " does not exist in the task " + task.getName());
+ }
+
+ taskContext.setStatus(status);
+ taskContext.setReason(reason);
+ taskContext.setOutPortId(selectedOutPort.get().getId());
}
public RestTemplate getRestTemplate() {
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
index 2fdf8af..e94beab 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
@@ -1,5 +1,9 @@
package org.apache.airavata.k8s.task.api;
+import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -9,12 +13,50 @@ import java.util.Map;
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
-public class TaskContext {
+public class TaskContext implements Serializable {
+ private long processId;
private long taskId;
+ private int status;
+ private String reason;
+
+ public long getOutPortId() {
+ return outPortId;
+ }
+
+ public TaskContext setOutPortId(long outPortId) {
+ this.outPortId = outPortId;
+ return this;
+ }
+
+ private long outPortId;
private Map<String, String> contextVariableParams = new HashMap<>();
private Map<String, String> contextDataParams = new HashMap<>();
- private Map<String, Object> localContext = new HashMap<>();
+ private transient Map<String, Object> localContext = new HashMap<>();
+
+ private void resetStatus() {
+ setStatus(-1);
+ setReason("");
+ setOutPortId(-1);
+ setProcessId(-1);
+ setTaskId(-1);
+ }
+
+ public void assignTask(TaskResource taskResource) {
+ resetStatus();
+ setTaskId(taskResource.getId());
+ setProcessId(taskResource.getParentProcessId());
+ setStatus(TaskStatusResource.State.SCHEDULED);
+ }
+
+ public void resetPublicContext() {
+ this.contextVariableParams = new HashMap<>();
+ this.contextDataParams = new HashMap<>();
+ }
+
+ public void resetLocalContext() {
+ this.localContext = new HashMap<>();
+ }
public long getTaskId() {
return taskId;
@@ -51,4 +93,32 @@ public class TaskContext {
this.localContext = localContext;
return this;
}
+
+ public long getProcessId() {
+ return processId;
+ }
+
+ public TaskContext setProcessId(long processId) {
+ this.processId = processId;
+ return this;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public TaskContext setStatus(int status) {
+ this.status = status;
+ return this;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public TaskContext setReason(String reason) {
+ this.reason = reason;
+ return this;
+ }
+
}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
index 524ce2e..b826d5b 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
@@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api;
import org.apache.kafka.common.serialization.Deserializer;
+import java.io.*;
import java.util.Map;
/**
@@ -19,6 +20,24 @@ public class TaskContextDeserializer implements Deserializer<TaskContext> {
@Override
public TaskContext deserialize(String topic, byte[] data) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ ObjectInput in = null;
+ try {
+ in = new ObjectInputStream(bis);
+ return(TaskContext)in.readObject();
+ } catch (IOException e) {
+ // ignore exception
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException ex) {
+ // ignore close exception
+ }
+ }
return null;
}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
index eb4c762..0edac4b 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
@@ -2,6 +2,10 @@ package org.apache.airavata.k8s.task.api;
import org.apache.kafka.common.serialization.Serializer;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
import java.util.Map;
/**
@@ -18,7 +22,23 @@ public class TaskContextSerializer implements Serializer<TaskContext> {
@Override
public byte[] serialize(String topic, TaskContext data) {
- return new byte[0];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = null;
+ try {
+ out = new ObjectOutputStream(bos);
+ out.writeObject(data);
+ out.flush();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ // ignore catch
+ } finally {
+ try {
+ bos.close();
+ } catch (IOException ex) {
+ // ignore close exception
+ }
+ }
+ return null;
}
@Override
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
index 307215f..b584833 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.k8s.task.api.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
@@ -31,13 +32,13 @@ import org.springframework.kafka.core.KafkaTemplate;
public class KafkaSender {
@Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
+ private KafkaTemplate<String, TaskContext> kafkaTemplate;
- public void send(String topic, String payload) {
- kafkaTemplate.send(topic, payload);
+ public void send(String topic, TaskContext taskContext) {
+ kafkaTemplate.send(topic, taskContext);
}
- public void send(String topic, String key, String payload) {
- kafkaTemplate.send(topic, key, payload);
+ public void send(String topic, String key, TaskContext taskContext) {
+ kafkaTemplate.send(topic, key, taskContext);
}
}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
index b078a79..8a09a4e 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.task.api.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
@@ -59,7 +61,7 @@ public class ReceiverConfig {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, taskGroupName);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -67,13 +69,13 @@ public class ReceiverConfig {
}
@Bean
- public ConsumerFactory<String, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
+ public ConsumerFactory<String, TaskContext> consumerFactory() {
+ return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigs());
}
@Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TaskContext>> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
index cd8b54b..e66e1fd 100644
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
+++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.k8s.task.api.messaging;
+import org.apache.airavata.k8s.task.api.TaskContext;
+import org.apache.airavata.k8s.task.api.TaskContextSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
@@ -48,17 +50,17 @@ public class SenderConfig {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
return props;
}
@Bean
- public ProducerFactory<String, String> producerFactory() {
- return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
+ public ProducerFactory<String, TaskContext> producerFactory() {
+ return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
}
@Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
+ public KafkaTemplate<String, TaskContext> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
--
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.