You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:45 UTC

[airavata-data-lake] 38/42: Non blocking task initial framework

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git

commit 42c9da6378d34e54873a8cdf2f34c59eed2d3a56
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon May 24 04:14:13 2021 -0400

    Non blocking task initial framework
---
 .../engine/services/participant/Participant.java   |  21 ++++
 .../services/wm/DataSyncWorkflowManager.java       |  21 ++--
 .../engine/services/wm/WorkflowOperator.java       | 107 +++++++++++++++------
 .../workflow/engine/task/AbstractTask.java         |   8 +-
 .../workflow/engine/task/NonBlockingTask.java      |  41 +++++++-
 .../engine/task/annotation/NonBlockingSection.java |   2 +-
 .../engine/task/impl/ExampleBlockingTask.java      |   2 +-
 .../engine/task/impl/ExampleNonBlockingTask.java   |  17 +++-
 8 files changed, 179 insertions(+), 40 deletions(-)

diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index 7b6fd9c..65314ee 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -18,7 +18,9 @@
 package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
 
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
 import org.apache.helix.InstanceType;
 import org.apache.helix.examples.OnlineOfflineStateModelFactory;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -196,6 +198,25 @@ public class Participant implements CommandLineRunner {
                 throw e;
             }
         }
+
+        for (String className : nonBlockingTaskClasses) {
+            try {
+                logger.info("Loading non blocking task {}", className);
+                Class<?> taskClz = Class.forName(className);
+                Object taskObj = taskClz.getConstructor().newInstance();
+                NonBlockingTask nonBlockingTask = (NonBlockingTask) taskObj;
+                TaskFactory taskFactory = context -> {
+                    nonBlockingTask.setCallbackContext(context);
+                    return nonBlockingTask;
+                };
+                NonBlockingTaskDef nbtDef = nonBlockingTask.getClass().getAnnotation(NonBlockingTaskDef.class);
+                taskMap.put(nbtDef.name(), taskFactory);
+
+            } catch (ClassNotFoundException e) {
+                logger.error("Couldn't find a class with name {}", className);
+                throw e;
+            }
+        }
         return taskMap;
     }
 
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
index 869b7dd..9f08751 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -20,6 +20,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.CommandLineRunner;
@@ -63,17 +64,23 @@ public class DataSyncWorkflowManager implements CommandLineRunner {
         ExampleBlockingTask bt4 = new ExampleBlockingTask();
         bt4.setTaskId("bt4-" + UUID.randomUUID());
 
+        ExampleNonBlockingTask nbt1 = new ExampleNonBlockingTask();
+        nbt1.setTaskId("nbt1-" + UUID.randomUUID());
+        nbt1.setCurrentSection(2);
+
         // Setting dependency
-        bt1.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
-        bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
-        bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+        bt1.setOutPort(new OutPort().setNextTaskId(nbt1.getTaskId()));
+        //bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+        //bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
 
         Map<String, AbstractTask> taskMap = new HashMap<>();
         taskMap.put(bt1.getTaskId(), bt1);
-        taskMap.put(bt2.getTaskId(), bt2);
-        taskMap.put(bt3.getTaskId(), bt3);
-        taskMap.put(bt4.getTaskId(), bt4);
-        String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+        taskMap.put(nbt1.getTaskId(), nbt1);
+        //taskMap.put(bt2.getTaskId(), bt2);
+        //taskMap.put(bt3.getTaskId(), bt3);
+        //taskMap.put(bt4.getTaskId(), bt4);
+        //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+        String[] startTaskIds = {bt1.getTaskId()};
         String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
         logger.info("Launched workflow {}", workflowId);
     }
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 2e52f05..ba3ef58 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -18,9 +18,11 @@
 package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
 
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
 import org.apache.commons.beanutils.PropertyUtils;
@@ -96,38 +98,82 @@ public class WorkflowOperator {
         return workflowName;
     }
 
+    private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String nonBlockingTaskId, int currentSection) {
+
+    }
+
     private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
             throws Exception{
         AbstractTask currentTask = taskMap.get(nextTaskId);
-        String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name();
-        TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
-                .setTaskId(currentTask.getTaskId())
-                .setCommand(taskType);
 
-        Map<String, String> paramMap = serializeTaskData(currentTask);
-        paramMap.forEach(taskBuilder::addConfig);
+        if (currentTask == null) {
+            logger.error("Couldn't find a task with id {} in the task map", nextTaskId);
+            throw new Exception("Couldn't find a task with id " + nextTaskId +" in the task map");
+        }
 
-        List<TaskConfig> taskBuilds = new ArrayList<>();
-        taskBuilds.add(taskBuilder.build());
+        BlockingTaskDef blockingTaskDef = currentTask.getClass().getAnnotation(BlockingTaskDef.class);
+        NonBlockingTaskDef nonBlockingTaskDef = currentTask.getClass().getAnnotation(NonBlockingTaskDef.class);
 
-        JobConfig.Builder job = new JobConfig.Builder()
-                .addTaskConfigs(taskBuilds)
-                .setFailureThreshold(0)
-                .setExpiry(WORKFLOW_EXPIRY_TIME)
-                .setTimeoutPerTask(TASK_EXPIRY_TIME)
-                .setNumConcurrentTasksPerInstance(20)
-                .setMaxAttemptsPerTask(currentTask.getRetryCount());
+        if (blockingTaskDef != null) {
+            String taskName = blockingTaskDef.name();
+            TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+                    .setTaskId(currentTask.getTaskId())
+                    .setCommand(taskName);
 
-        workflowBuilder.addJob(currentTask.getTaskId(), job);
+            Map<String, String> paramMap = serializeTaskData(currentTask);
+            paramMap.forEach(taskBuilder::addConfig);
 
-        List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+            List<TaskConfig> taskBuilds = new ArrayList<>();
+            taskBuilds.add(taskBuilder.build());
 
-        for (OutPort outPort : outPorts) {
-            if (outPort != null) {
-                workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
-                logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
-                buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+            JobConfig.Builder job = new JobConfig.Builder()
+                    .addTaskConfigs(taskBuilds)
+                    .setFailureThreshold(0)
+                    .setExpiry(WORKFLOW_EXPIRY_TIME)
+                    .setTimeoutPerTask(TASK_EXPIRY_TIME)
+                    .setNumConcurrentTasksPerInstance(20)
+                    .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+            workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+            List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+
+            for (OutPort outPort : outPorts) {
+                if (outPort != null) {
+                    workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+                    logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
+                    buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+                }
             }
+        } else if (nonBlockingTaskDef != null) {
+
+            NonBlockingTask nbTask = (NonBlockingTask) currentTask;
+
+            String taskName = nonBlockingTaskDef.name();
+            TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+                    .setTaskId(currentTask.getTaskId())
+                    .setCommand(taskName);
+
+            Map<String, String> paramMap = serializeTaskData(currentTask);
+            paramMap.forEach(taskBuilder::addConfig);
+
+            List<TaskConfig> taskBuilds = new ArrayList<>();
+            taskBuilds.add(taskBuilder.build());
+
+            JobConfig.Builder job = new JobConfig.Builder()
+                    .addTaskConfigs(taskBuilds)
+                    .setFailureThreshold(0)
+                    .setExpiry(WORKFLOW_EXPIRY_TIME)
+                    .setTimeoutPerTask(TASK_EXPIRY_TIME)
+                    .setNumConcurrentTasksPerInstance(20)
+                    .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+            workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+            continueNonBlockingRest(taskMap, nextTaskId, nbTask.getCurrentSection());
+        } else {
+            logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
+            throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
         }
     }
 
@@ -156,13 +202,18 @@ public class WorkflowOperator {
             Field[] fields = c.getDeclaredFields();
             for (Field classField : fields) {
                 TaskParam parm = classField.getAnnotation(TaskParam.class);
-                if (parm != null) {
-                    Object propertyValue = PropertyUtils.getProperty(data, parm.name());
-                    if (propertyValue instanceof TaskParamType) {
-                        result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
-                    } else {
-                        result.put(parm.name(), propertyValue.toString());
+                try {
+                    if (parm != null) {
+                        Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+                        if (propertyValue instanceof TaskParamType) {
+                            result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
+                        } else {
+                            result.put(parm.name(), propertyValue.toString());
+                        }
                     }
+                } catch (Exception e) {
+                    logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
+                    throw e;
                 }
 
                 TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 42d8406..89863fc 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -76,7 +76,13 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             logger.error("Failed at deserializing task data", e);
             return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
         }
-        return onRun();
+
+        try {
+            return onRun();
+        } catch (Exception e) {
+            logger.error("Unknown error while running task {}", getTaskId(), e);
+            return new TaskResult(TaskResult.Status.FAILED, "Failed due to unknown error");
+        }
     }
 
     @Override
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
index 9d2532c..912d6a3 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -17,20 +17,59 @@
 
 package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
 
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
 
 public class NonBlockingTask extends AbstractTask {
 
+    private final static Logger logger = LoggerFactory.getLogger(NonBlockingTask.class);
+
+    @TaskParam(name = "currentSection")
+    private ThreadLocal<Integer> currentSection = new ThreadLocal<>();
+
     public NonBlockingTask() {
     }
 
     @Override
     public TaskResult onRun() {
-        return null;
+        Class<?> c = this.getClass();
+        Method[] allMethods = c.getMethods();
+        for (Method method : allMethods) {
+            NonBlockingSection nbs = method.getAnnotation(NonBlockingSection.class);
+            if (nbs != null) {
+                if (nbs.sectionIndex() == getCurrentSection()) {
+                    try {
+                        Object result = method.invoke(this);
+                        return (TaskResult) result;
+                    } catch (Exception e) {
+                        logger.error("Failed to invoke designated section {}", getCurrentSection(), e);
+                        return new TaskResult(TaskResult.Status.FAILED,
+                                "Failed to invoke designated section " + getCurrentSection());
+                    }
+                }
+            }
+        }
+
+        logger.error("Couldn't find a section matching section id {}", getCurrentSection());
+        return new TaskResult(TaskResult.Status.FAILED, "Couldn't find a section matching section id " + getCurrentSection());
     }
 
     @Override
     public void onCancel() {
 
     }
+
+    public Integer getCurrentSection() {
+        return currentSection.get();
+    }
+
+    public void setCurrentSection(Integer currentSection) {
+        this.currentSection.set(currentSection);
+    }
 }
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
index 8047a9b..6fb97b5 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
@@ -25,5 +25,5 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.METHOD)
 public @interface NonBlockingSection {
-    public int order();
+    public int sectionIndex();
 }
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
index 0c94839..631d06f 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -39,7 +39,7 @@ public class ExampleBlockingTask extends BlockingTask {
         if (getTaskId().startsWith("bt1")) {
             try {
                 logger.info("Task {} is sleeping", getTaskId());
-                Thread.sleep(10000);
+                Thread.sleep(1000);
                 //return new TaskResult(TaskResult.Status.FAILED, "Fail");
             } catch (InterruptedException e) {
                 e.printStackTrace();
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
index 527d0a2..0bdd9c9 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
@@ -18,11 +18,26 @@
 package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
 
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @NonBlockingTaskDef(name = "ExampleNonBlockingTask")
 public class ExampleNonBlockingTask extends NonBlockingTask {
 
-    public ExampleNonBlockingTask() {
+    private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+
+    @NonBlockingSection(sectionIndex = 1)
+    public TaskResult section1() {
+        logger.info("Running section 1");
+        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    }
+
+    @NonBlockingSection(sectionIndex = 2)
+    public TaskResult section2() {
+        logger.info("Running section 2");
+        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
     }
 }