You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/06/01 17:59:18 UTC

[airavata-data-lake] 35/46: Serializing / deserialing task params using bean methods. Supporting thread local parameters to avoid thread contention in parallel job execution

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git

commit 97198401dac3149e69f7e875cbc0b3ad3b8c1e0b
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri May 21 02:40:01 2021 -0400

    Serializing / deserialing task params using bean methods. Supporting thread local parameters to avoid thread contention in parallel job execution
---
 data-orchestrator/workflow-engine/pom.xml          |  7 +-
 .../engine/services/participant/Participant.java   |  1 +
 .../services/wm/DataSyncWorkflowManager.java       | 15 ++++-
 .../engine/services/wm/WorkflowOperator.java       | 30 ++++++---
 .../workflow/engine/task/AbstractTask.java         | 75 ++++++++++++++--------
 .../engine/task/impl/ExampleBlockingTask.java      | 17 ++++-
 pom.xml                                            |  2 +-
 7 files changed, 108 insertions(+), 39 deletions(-)

diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
index ef96c4f..c698439 100644
--- a/data-orchestrator/workflow-engine/pom.xml
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -41,7 +41,7 @@
         <dependency>
             <groupId>org.apache.helix</groupId>
             <artifactId>helix-core</artifactId>
-            <version>1.0.1</version>
+            <version>0.9.7</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
@@ -68,5 +68,10 @@
             <artifactId>snakeyaml</artifactId>
             <version>${yaml.version}</version>
         </dependency>
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+            <version>${commons.beanutils.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index a4b9bdd..7b6fd9c 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -87,6 +87,7 @@ public class Participant implements CommandLineRunner {
                 InstanceConfig instanceConfig = new InstanceConfig(participantName);
                 instanceConfig.setHostName("localhost");
                 instanceConfig.setInstanceEnabled(true);
+                instanceConfig.setMaxConcurrentTask(30);
                 zkHelixAdmin.addInstance(clusterName, instanceConfig);
                 logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName);
 
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
index 1e987a6..869b7dd 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -57,13 +57,24 @@ public class DataSyncWorkflowManager implements CommandLineRunner {
         ExampleBlockingTask bt2 = new ExampleBlockingTask();
         bt2.setTaskId("bt2-" + UUID.randomUUID());
 
+        ExampleBlockingTask bt3 = new ExampleBlockingTask();
+        bt3.setTaskId("bt3-" + UUID.randomUUID());
+
+        ExampleBlockingTask bt4 = new ExampleBlockingTask();
+        bt4.setTaskId("bt4-" + UUID.randomUUID());
+
         // Setting dependency
-        bt1.setOutPort(new OutPort().setNextTaskId(bt2.getTaskId()));
+        bt1.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+        bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+        bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
 
         Map<String, AbstractTask> taskMap = new HashMap<>();
         taskMap.put(bt1.getTaskId(), bt1);
         taskMap.put(bt2.getTaskId(), bt2);
-        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, bt1.getTaskId());
+        taskMap.put(bt3.getTaskId(), bt3);
+        taskMap.put(bt4.getTaskId(), bt4);
+        String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
         logger.info("Launched workflow {}", workflowId);
     }
 
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 8590eb5..2e52f05 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -23,18 +23,25 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamT
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.task.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.util.*;
 
 public class WorkflowOperator {
 
     private static final long WORKFLOW_EXPIRY_TIME = 1 * 1000;
     private static final long TASK_EXPIRY_TIME = 24 * 60 * 60 * 1000;
+    private static final int PARALLEL_JOBS_PER_WORKFLOW = 20;
+
+    private final static Logger logger = LoggerFactory.getLogger(WorkflowOperator.class);
 
     private TaskDriver taskDriver;
     private HelixManager helixManager;
@@ -64,7 +71,7 @@ public class WorkflowOperator {
         }
     }
 
-    public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String startTaskId) throws Exception {
+    public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String[] startTaskIds) throws Exception {
 
         if (taskDriver == null) {
             throw new Exception("Workflow operator needs to be initialized");
@@ -72,9 +79,15 @@ public class WorkflowOperator {
 
         String workflowName = UUID.randomUUID().toString();
         Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
-        buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
 
-        WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+        for (String startTaskId: startTaskIds) {
+            buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+        }
+
+        WorkflowConfig.Builder config = new WorkflowConfig.Builder()
+                .setFailureThreshold(0)
+                .setAllowOverlapJobAssignment(true);
+
         workflowBuilder.setWorkflowConfig(config.build());
         workflowBuilder.setExpiry(WORKFLOW_EXPIRY_TIME);
         Workflow workflow = workflowBuilder.build();
@@ -112,6 +125,7 @@ public class WorkflowOperator {
         for (OutPort outPort : outPorts) {
             if (outPort != null) {
                 workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+                logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
                 buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
             }
         }
@@ -135,7 +149,7 @@ public class WorkflowOperator {
         taskDriver.delete(workflowName);
     }
 
-    private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+    private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
 
         Map<String, String> result = new HashMap<>();
         for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
@@ -143,11 +157,11 @@ public class WorkflowOperator {
             for (Field classField : fields) {
                 TaskParam parm = classField.getAnnotation(TaskParam.class);
                 if (parm != null) {
-                    classField.setAccessible(true);
-                    if (classField.get(data) instanceof TaskParamType) {
-                        result.put(parm.name(), TaskParamType.class.cast(classField.get(data)).serialize());
+                    Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+                    if (propertyValue instanceof TaskParamType) {
+                        result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
                     } else {
-                        result.put(parm.name(), classField.get(data).toString());
+                        result.put(parm.name(), propertyValue.toString());
                     }
                 }
 
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index c9ceee9..42d8406 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -19,6 +19,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
 
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskResult;
@@ -26,27 +27,32 @@ import org.apache.helix.task.UserContentStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.beans.PropertyDescriptor;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public abstract class AbstractTask extends UserContentStore implements Task {
 
     private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
 
-    private TaskCallbackContext callbackContext;
+    private ThreadLocal<TaskCallbackContext> callbackContext = new ThreadLocal<>();
+    private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
 
     @TaskOutPort(name = "nextTask")
     private OutPort outPort;
 
     @TaskParam(name = "taskId")
-    private String taskId;
+    private ThreadLocal<String> taskId = new ThreadLocal<>();
 
     @TaskParam(name = "retryCount")
-    private int retryCount = 3;
+    private ThreadLocal<Integer> retryCount = ThreadLocal.withInitial(()-> 3);
 
     public AbstractTask() {
 
@@ -55,9 +61,17 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     @Override
     public TaskResult run() {
         try {
-            String helixTaskId = this.callbackContext.getTaskConfig().getId();
+            TaskCallbackContext cbc = callbackContextQueue.poll();
+
+            if (cbc == null) {
+                logger.error("No callback context available");
+                throw new Exception("No callback context available");
+            }
+
+            this.callbackContext.set(cbc);
+            String helixTaskId = getCallbackContext().getTaskConfig().getId();
             logger.info("Running task {}", helixTaskId);
-            deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+            deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
         } catch (Exception e) {
             logger.error("Failed at deserializing task data", e);
             return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
@@ -83,27 +97,32 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     }
 
     public int getRetryCount() {
-        return retryCount;
+        return retryCount.get();
     }
 
     public void setRetryCount(int retryCount) {
-        this.retryCount = retryCount;
+        this.retryCount.set(retryCount);
     }
 
     public TaskCallbackContext getCallbackContext() {
-        return callbackContext;
+        return callbackContext.get();
     }
 
     public String getTaskId() {
-        return taskId;
+        return taskId.get();
     }
 
     public void setTaskId(String taskId) {
-        this.taskId = taskId;
+        this.taskId.set(taskId);
     }
 
     public void setCallbackContext(TaskCallbackContext callbackContext) {
-        this.callbackContext = callbackContext;
+        logger.info("Setting callback context {}", callbackContext.getJobConfig().getId());
+        try {
+            this.callbackContextQueue.put(callbackContext);
+        } catch (InterruptedException e) {
+            logger.error("Failed to put callback context to the queue", e);
+        }
     }
 
     private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
@@ -124,23 +143,27 @@ public abstract class AbstractTask extends UserContentStore implements Task {
             if (param != null) {
                 if (params.containsKey(param.name())) {
                     classField.setAccessible(true);
-                    if (classField.getType().isAssignableFrom(String.class)) {
-                        classField.set(instance, params.get(param.name()));
-                    } else if (classField.getType().isAssignableFrom(Integer.class) ||
-                            classField.getType().isAssignableFrom(Integer.TYPE)) {
-                        classField.set(instance, Integer.parseInt(params.get(param.name())));
-                    } else if (classField.getType().isAssignableFrom(Long.class) ||
-                            classField.getType().isAssignableFrom(Long.TYPE)) {
-                        classField.set(instance, Long.parseLong(params.get(param.name())));
-                    } else if (classField.getType().isAssignableFrom(Boolean.class) ||
-                            classField.getType().isAssignableFrom(Boolean.TYPE)) {
-                        classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
-                    } else if (TaskParamType.class.isAssignableFrom(classField.getType())) {
-                        Class<?> clazz = classField.getType();
-                        Constructor<?> ctor = clazz.getConstructor();
+                    PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
+                    Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
+                    Class<?>[] methodParamType = writeMethod.getParameterTypes();
+                    Class<?> writeParameterType = methodParamType[0];
+
+                    if (writeParameterType.isAssignableFrom(String.class)) {
+                        writeMethod.invoke(instance, params.get(param.name()));
+                    } else if (writeParameterType.isAssignableFrom(Integer.class) ||
+                            writeParameterType.isAssignableFrom(Integer.TYPE)) {
+                        writeMethod.invoke(instance, Integer.parseInt(params.get(param.name())));
+                    } else if (writeParameterType.isAssignableFrom(Long.class) ||
+                            writeParameterType.isAssignableFrom(Long.TYPE)) {
+                        writeMethod.invoke(instance, Long.parseLong(params.get(param.name())));
+                    } else if (writeParameterType.isAssignableFrom(Boolean.class) ||
+                            writeParameterType.isAssignableFrom(Boolean.TYPE)) {
+                        writeMethod.invoke(instance, Boolean.parseBoolean(params.get(param.name())));
+                    } else if (TaskParamType.class.isAssignableFrom(writeParameterType)) {
+                        Constructor<?> ctor = writeParameterType.getConstructor();
                         Object obj = ctor.newInstance();
                         ((TaskParamType)obj).deserialize(params.get(param.name()));
-                        classField.set(instance, obj);
+                        writeMethod.invoke(instance, obj);
                     }
                 }
             }
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
index 93ec010..0c94839 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -30,7 +30,22 @@ public class ExampleBlockingTask extends BlockingTask {
 
     @Override
     public TaskResult runBlockingCode() {
-        logger.info("Running example blocking task {}", getTaskId());
+        logger.info("Starting task {}", getTaskId());
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        if (getTaskId().startsWith("bt1")) {
+            try {
+                logger.info("Task {} is sleeping", getTaskId());
+                Thread.sleep(10000);
+                //return new TaskResult(TaskResult.Status.FAILED, "Fail");
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        logger.info("Ending task {}", getTaskId());
         return new TaskResult(TaskResult.Status.COMPLETED, "Success");
     }
 }
diff --git a/pom.xml b/pom.xml
index 9489f35..6473d58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,7 @@
         <spring-security.version>5.3.4.RELEASE</spring-security.version>
         <yaml.version>1.15</yaml.version>
         <spring.boot.version>2.2.1.RELEASE</spring.boot.version>
-
+        <commons.beanutils.version>1.9.4</commons.beanutils.version>
     </properties>
 
 </project>