You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:45 UTC
[airavata-data-lake] 38/42: Non blocking task initial framework
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit 42c9da6378d34e54873a8cdf2f34c59eed2d3a56
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon May 24 04:14:13 2021 -0400
Non blocking task initial framework
---
.../engine/services/participant/Participant.java | 21 ++++
.../services/wm/DataSyncWorkflowManager.java | 21 ++--
.../engine/services/wm/WorkflowOperator.java | 107 +++++++++++++++------
.../workflow/engine/task/AbstractTask.java | 8 +-
.../workflow/engine/task/NonBlockingTask.java | 41 +++++++-
.../engine/task/annotation/NonBlockingSection.java | 2 +-
.../engine/task/impl/ExampleBlockingTask.java | 2 +-
.../engine/task/impl/ExampleNonBlockingTask.java | 17 +++-
8 files changed, 179 insertions(+), 40 deletions(-)
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index 7b6fd9c..65314ee 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -18,7 +18,9 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
import org.apache.helix.InstanceType;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -196,6 +198,25 @@ public class Participant implements CommandLineRunner {
throw e;
}
}
+
+ for (String className : nonBlockingTaskClasses) {
+ try {
+ logger.info("Loading non blocking task {}", className);
+ Class<?> taskClz = Class.forName(className);
+ Object taskObj = taskClz.getConstructor().newInstance();
+ NonBlockingTask nonBlockingTask = (NonBlockingTask) taskObj;
+ TaskFactory taskFactory = context -> {
+ nonBlockingTask.setCallbackContext(context);
+ return nonBlockingTask;
+ };
+ NonBlockingTaskDef nbtDef = nonBlockingTask.getClass().getAnnotation(NonBlockingTaskDef.class);
+ taskMap.put(nbtDef.name(), taskFactory);
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Couldn't find a class with name {}", className);
+ throw e;
+ }
+ }
return taskMap;
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
index 869b7dd..9f08751 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -20,6 +20,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
@@ -63,17 +64,23 @@ public class DataSyncWorkflowManager implements CommandLineRunner {
ExampleBlockingTask bt4 = new ExampleBlockingTask();
bt4.setTaskId("bt4-" + UUID.randomUUID());
+ ExampleNonBlockingTask nbt1 = new ExampleNonBlockingTask();
+ nbt1.setTaskId("nbt1-" + UUID.randomUUID());
+ nbt1.setCurrentSection(2);
+
// Setting dependency
- bt1.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
- bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
- bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+ bt1.setOutPort(new OutPort().setNextTaskId(nbt1.getTaskId()));
+ //bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+ //bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
Map<String, AbstractTask> taskMap = new HashMap<>();
taskMap.put(bt1.getTaskId(), bt1);
- taskMap.put(bt2.getTaskId(), bt2);
- taskMap.put(bt3.getTaskId(), bt3);
- taskMap.put(bt4.getTaskId(), bt4);
- String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+ taskMap.put(nbt1.getTaskId(), nbt1);
+ //taskMap.put(bt2.getTaskId(), bt2);
+ //taskMap.put(bt3.getTaskId(), bt3);
+ //taskMap.put(bt4.getTaskId(), bt4);
+ //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+ String[] startTaskIds = {bt1.getTaskId()};
String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
logger.info("Launched workflow {}", workflowId);
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 2e52f05..ba3ef58 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -18,9 +18,11 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
import org.apache.commons.beanutils.PropertyUtils;
@@ -96,38 +98,82 @@ public class WorkflowOperator {
return workflowName;
}
+ private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String nonBlockingTaskId, int currentSection) {
+
+ }
+
private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
throws Exception{
AbstractTask currentTask = taskMap.get(nextTaskId);
- String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name();
- TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
- .setTaskId(currentTask.getTaskId())
- .setCommand(taskType);
- Map<String, String> paramMap = serializeTaskData(currentTask);
- paramMap.forEach(taskBuilder::addConfig);
+ if (currentTask == null) {
+ logger.error("Couldn't find a task with id {} in the task map", nextTaskId);
+ throw new Exception("Couldn't find a task with id " + nextTaskId +" in the task map");
+ }
- List<TaskConfig> taskBuilds = new ArrayList<>();
- taskBuilds.add(taskBuilder.build());
+ BlockingTaskDef blockingTaskDef = currentTask.getClass().getAnnotation(BlockingTaskDef.class);
+ NonBlockingTaskDef nonBlockingTaskDef = currentTask.getClass().getAnnotation(NonBlockingTaskDef.class);
- JobConfig.Builder job = new JobConfig.Builder()
- .addTaskConfigs(taskBuilds)
- .setFailureThreshold(0)
- .setExpiry(WORKFLOW_EXPIRY_TIME)
- .setTimeoutPerTask(TASK_EXPIRY_TIME)
- .setNumConcurrentTasksPerInstance(20)
- .setMaxAttemptsPerTask(currentTask.getRetryCount());
+ if (blockingTaskDef != null) {
+ String taskName = blockingTaskDef.name();
+ TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+ .setTaskId(currentTask.getTaskId())
+ .setCommand(taskName);
- workflowBuilder.addJob(currentTask.getTaskId(), job);
+ Map<String, String> paramMap = serializeTaskData(currentTask);
+ paramMap.forEach(taskBuilder::addConfig);
- List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+ List<TaskConfig> taskBuilds = new ArrayList<>();
+ taskBuilds.add(taskBuilder.build());
- for (OutPort outPort : outPorts) {
- if (outPort != null) {
- workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
- logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
- buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+ JobConfig.Builder job = new JobConfig.Builder()
+ .addTaskConfigs(taskBuilds)
+ .setFailureThreshold(0)
+ .setExpiry(WORKFLOW_EXPIRY_TIME)
+ .setTimeoutPerTask(TASK_EXPIRY_TIME)
+ .setNumConcurrentTasksPerInstance(20)
+ .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+ workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+ List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+
+ for (OutPort outPort : outPorts) {
+ if (outPort != null) {
+ workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+ logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
+ buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+ }
}
+ } else if (nonBlockingTaskDef != null) {
+
+ NonBlockingTask nbTask = (NonBlockingTask) currentTask;
+
+ String taskName = nonBlockingTaskDef.name();
+ TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+ .setTaskId(currentTask.getTaskId())
+ .setCommand(taskName);
+
+ Map<String, String> paramMap = serializeTaskData(currentTask);
+ paramMap.forEach(taskBuilder::addConfig);
+
+ List<TaskConfig> taskBuilds = new ArrayList<>();
+ taskBuilds.add(taskBuilder.build());
+
+ JobConfig.Builder job = new JobConfig.Builder()
+ .addTaskConfigs(taskBuilds)
+ .setFailureThreshold(0)
+ .setExpiry(WORKFLOW_EXPIRY_TIME)
+ .setTimeoutPerTask(TASK_EXPIRY_TIME)
+ .setNumConcurrentTasksPerInstance(20)
+ .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+ workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+ continueNonBlockingRest(taskMap, nextTaskId, nbTask.getCurrentSection());
+ } else {
+ logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
+ throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
}
}
@@ -156,13 +202,18 @@ public class WorkflowOperator {
Field[] fields = c.getDeclaredFields();
for (Field classField : fields) {
TaskParam parm = classField.getAnnotation(TaskParam.class);
- if (parm != null) {
- Object propertyValue = PropertyUtils.getProperty(data, parm.name());
- if (propertyValue instanceof TaskParamType) {
- result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
- } else {
- result.put(parm.name(), propertyValue.toString());
+ try {
+ if (parm != null) {
+ Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+ if (propertyValue instanceof TaskParamType) {
+ result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
+ } else {
+ result.put(parm.name(), propertyValue.toString());
+ }
}
+ } catch (Exception e) {
+ logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
+ throw e;
}
TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 42d8406..89863fc 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -76,7 +76,13 @@ public abstract class AbstractTask extends UserContentStore implements Task {
logger.error("Failed at deserializing task data", e);
return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
}
- return onRun();
+
+ try {
+ return onRun();
+ } catch (Exception e) {
+ logger.error("Unknown error while running task {}", getTaskId(), e);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed due to unknown error");
+ }
}
@Override
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
index 9d2532c..912d6a3 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -17,20 +17,59 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
public class NonBlockingTask extends AbstractTask {
+ private final static Logger logger = LoggerFactory.getLogger(NonBlockingTask.class);
+
+ @TaskParam(name = "currentSection")
+ private ThreadLocal<Integer> currentSection = new ThreadLocal<>();
+
public NonBlockingTask() {
}
@Override
public TaskResult onRun() {
- return null;
+ Class<?> c = this.getClass();
+ Method[] allMethods = c.getMethods();
+ for (Method method : allMethods) {
+ NonBlockingSection nbs = method.getAnnotation(NonBlockingSection.class);
+ if (nbs != null) {
+ if (nbs.sectionIndex() == getCurrentSection()) {
+ try {
+ Object result = method.invoke(this);
+ return (TaskResult) result;
+ } catch (Exception e) {
+ logger.error("Failed to invoke designated section {}", getCurrentSection(), e);
+ return new TaskResult(TaskResult.Status.FAILED,
+ "Failed to invoke designated section " + getCurrentSection());
+ }
+ }
+ }
+ }
+
+ logger.error("Couldn't find a section matching section id {}", getCurrentSection());
+ return new TaskResult(TaskResult.Status.FAILED, "Couldn't find a section matching section id " + getCurrentSection());
}
@Override
public void onCancel() {
}
+
+ public Integer getCurrentSection() {
+ return currentSection.get();
+ }
+
+ public void setCurrentSection(Integer currentSection) {
+ this.currentSection.set(currentSection);
+ }
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
index 8047a9b..6fb97b5 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
@@ -25,5 +25,5 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface NonBlockingSection {
- public int order();
+ public int sectionIndex();
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
index 0c94839..631d06f 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -39,7 +39,7 @@ public class ExampleBlockingTask extends BlockingTask {
if (getTaskId().startsWith("bt1")) {
try {
logger.info("Task {} is sleeping", getTaskId());
- Thread.sleep(10000);
+ Thread.sleep(1000);
//return new TaskResult(TaskResult.Status.FAILED, "Fail");
} catch (InterruptedException e) {
e.printStackTrace();
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
index 527d0a2..0bdd9c9 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
@@ -18,11 +18,26 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
public class ExampleNonBlockingTask extends NonBlockingTask {
- public ExampleNonBlockingTask() {
+ private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+
+ @NonBlockingSection(sectionIndex = 1)
+ public TaskResult section1() {
+ logger.info("Running section 1");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ }
+
+ @NonBlockingSection(sectionIndex = 2)
+ public TaskResult section2() {
+ logger.info("Running section 2");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
}
}