You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/06/01 17:59:18 UTC
[airavata-data-lake] 35/46: Serializing / deserialing task params
using bean methods. Supporting thread local parameters to avoid thread
contention in parallel job execution
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit 97198401dac3149e69f7e875cbc0b3ad3b8c1e0b
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri May 21 02:40:01 2021 -0400
Serializing / deserialing task params using bean methods. Supporting thread local parameters to avoid thread contention in parallel job execution
---
data-orchestrator/workflow-engine/pom.xml | 7 +-
.../engine/services/participant/Participant.java | 1 +
.../services/wm/DataSyncWorkflowManager.java | 15 ++++-
.../engine/services/wm/WorkflowOperator.java | 30 ++++++---
.../workflow/engine/task/AbstractTask.java | 75 ++++++++++++++--------
.../engine/task/impl/ExampleBlockingTask.java | 17 ++++-
pom.xml | 2 +-
7 files changed, 108 insertions(+), 39 deletions(-)
diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
index ef96c4f..c698439 100644
--- a/data-orchestrator/workflow-engine/pom.xml
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
- <version>1.0.1</version>
+ <version>0.9.7</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -68,5 +68,10 @@
<artifactId>snakeyaml</artifactId>
<version>${yaml.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>${commons.beanutils.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index a4b9bdd..7b6fd9c 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -87,6 +87,7 @@ public class Participant implements CommandLineRunner {
InstanceConfig instanceConfig = new InstanceConfig(participantName);
instanceConfig.setHostName("localhost");
instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setMaxConcurrentTask(30);
zkHelixAdmin.addInstance(clusterName, instanceConfig);
logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName);
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
index 1e987a6..869b7dd 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -57,13 +57,24 @@ public class DataSyncWorkflowManager implements CommandLineRunner {
ExampleBlockingTask bt2 = new ExampleBlockingTask();
bt2.setTaskId("bt2-" + UUID.randomUUID());
+ ExampleBlockingTask bt3 = new ExampleBlockingTask();
+ bt3.setTaskId("bt3-" + UUID.randomUUID());
+
+ ExampleBlockingTask bt4 = new ExampleBlockingTask();
+ bt4.setTaskId("bt4-" + UUID.randomUUID());
+
// Setting dependency
- bt1.setOutPort(new OutPort().setNextTaskId(bt2.getTaskId()));
+ bt1.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+ bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
+ bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
Map<String, AbstractTask> taskMap = new HashMap<>();
taskMap.put(bt1.getTaskId(), bt1);
taskMap.put(bt2.getTaskId(), bt2);
- String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, bt1.getTaskId());
+ taskMap.put(bt3.getTaskId(), bt3);
+ taskMap.put(bt4.getTaskId(), bt4);
+ String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
logger.info("Launched workflow {}", workflowId);
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index 8590eb5..2e52f05 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -23,18 +23,25 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamT
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.util.*;
public class WorkflowOperator {
private static final long WORKFLOW_EXPIRY_TIME = 1 * 1000;
private static final long TASK_EXPIRY_TIME = 24 * 60 * 60 * 1000;
+ private static final int PARALLEL_JOBS_PER_WORKFLOW = 20;
+
+ private final static Logger logger = LoggerFactory.getLogger(WorkflowOperator.class);
private TaskDriver taskDriver;
private HelixManager helixManager;
@@ -64,7 +71,7 @@ public class WorkflowOperator {
}
}
- public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String startTaskId) throws Exception {
+ public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String[] startTaskIds) throws Exception {
if (taskDriver == null) {
throw new Exception("Workflow operator needs to be initialized");
@@ -72,9 +79,15 @@ public class WorkflowOperator {
String workflowName = UUID.randomUUID().toString();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
- buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
- WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+ for (String startTaskId: startTaskIds) {
+ buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+ }
+
+ WorkflowConfig.Builder config = new WorkflowConfig.Builder()
+ .setFailureThreshold(0)
+ .setAllowOverlapJobAssignment(true);
+
workflowBuilder.setWorkflowConfig(config.build());
workflowBuilder.setExpiry(WORKFLOW_EXPIRY_TIME);
Workflow workflow = workflowBuilder.build();
@@ -112,6 +125,7 @@ public class WorkflowOperator {
for (OutPort outPort : outPorts) {
if (outPort != null) {
workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+ logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
}
}
@@ -135,7 +149,7 @@ public class WorkflowOperator {
taskDriver.delete(workflowName);
}
- private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+ private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
Map<String, String> result = new HashMap<>();
for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
@@ -143,11 +157,11 @@ public class WorkflowOperator {
for (Field classField : fields) {
TaskParam parm = classField.getAnnotation(TaskParam.class);
if (parm != null) {
- classField.setAccessible(true);
- if (classField.get(data) instanceof TaskParamType) {
- result.put(parm.name(), TaskParamType.class.cast(classField.get(data)).serialize());
+ Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+ if (propertyValue instanceof TaskParamType) {
+ result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
} else {
- result.put(parm.name(), classField.get(data).toString());
+ result.put(parm.name(), propertyValue.toString());
}
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index c9ceee9..42d8406 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -19,6 +19,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.commons.beanutils.PropertyUtils;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
@@ -26,27 +27,32 @@ import org.apache.helix.task.UserContentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.beans.PropertyDescriptor;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
public abstract class AbstractTask extends UserContentStore implements Task {
private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
- private TaskCallbackContext callbackContext;
+ private ThreadLocal<TaskCallbackContext> callbackContext = new ThreadLocal<>();
+ private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
@TaskOutPort(name = "nextTask")
private OutPort outPort;
@TaskParam(name = "taskId")
- private String taskId;
+ private ThreadLocal<String> taskId = new ThreadLocal<>();
@TaskParam(name = "retryCount")
- private int retryCount = 3;
+ private ThreadLocal<Integer> retryCount = ThreadLocal.withInitial(()-> 3);
public AbstractTask() {
@@ -55,9 +61,17 @@ public abstract class AbstractTask extends UserContentStore implements Task {
@Override
public TaskResult run() {
try {
- String helixTaskId = this.callbackContext.getTaskConfig().getId();
+ TaskCallbackContext cbc = callbackContextQueue.poll();
+
+ if (cbc == null) {
+ logger.error("No callback context available");
+ throw new Exception("No callback context available");
+ }
+
+ this.callbackContext.set(cbc);
+ String helixTaskId = getCallbackContext().getTaskConfig().getId();
logger.info("Running task {}", helixTaskId);
- deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+ deserializeTaskData(this, getCallbackContext().getTaskConfig().getConfigMap());
} catch (Exception e) {
logger.error("Failed at deserializing task data", e);
return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
@@ -83,27 +97,32 @@ public abstract class AbstractTask extends UserContentStore implements Task {
}
public int getRetryCount() {
- return retryCount;
+ return retryCount.get();
}
public void setRetryCount(int retryCount) {
- this.retryCount = retryCount;
+ this.retryCount.set(retryCount);
}
public TaskCallbackContext getCallbackContext() {
- return callbackContext;
+ return callbackContext.get();
}
public String getTaskId() {
- return taskId;
+ return taskId.get();
}
public void setTaskId(String taskId) {
- this.taskId = taskId;
+ this.taskId.set(taskId);
}
public void setCallbackContext(TaskCallbackContext callbackContext) {
- this.callbackContext = callbackContext;
+ logger.info("Setting callback context {}", callbackContext.getJobConfig().getId());
+ try {
+ this.callbackContextQueue.put(callbackContext);
+ } catch (InterruptedException e) {
+ logger.error("Failed to put callback context to the queue", e);
+ }
}
private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
@@ -124,23 +143,27 @@ public abstract class AbstractTask extends UserContentStore implements Task {
if (param != null) {
if (params.containsKey(param.name())) {
classField.setAccessible(true);
- if (classField.getType().isAssignableFrom(String.class)) {
- classField.set(instance, params.get(param.name()));
- } else if (classField.getType().isAssignableFrom(Integer.class) ||
- classField.getType().isAssignableFrom(Integer.TYPE)) {
- classField.set(instance, Integer.parseInt(params.get(param.name())));
- } else if (classField.getType().isAssignableFrom(Long.class) ||
- classField.getType().isAssignableFrom(Long.TYPE)) {
- classField.set(instance, Long.parseLong(params.get(param.name())));
- } else if (classField.getType().isAssignableFrom(Boolean.class) ||
- classField.getType().isAssignableFrom(Boolean.TYPE)) {
- classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
- } else if (TaskParamType.class.isAssignableFrom(classField.getType())) {
- Class<?> clazz = classField.getType();
- Constructor<?> ctor = clazz.getConstructor();
+ PropertyDescriptor propertyDescriptor = PropertyUtils.getPropertyDescriptor(this, classField.getName());
+ Method writeMethod = PropertyUtils.getWriteMethod(propertyDescriptor);
+ Class<?>[] methodParamType = writeMethod.getParameterTypes();
+ Class<?> writeParameterType = methodParamType[0];
+
+ if (writeParameterType.isAssignableFrom(String.class)) {
+ writeMethod.invoke(instance, params.get(param.name()));
+ } else if (writeParameterType.isAssignableFrom(Integer.class) ||
+ writeParameterType.isAssignableFrom(Integer.TYPE)) {
+ writeMethod.invoke(instance, Integer.parseInt(params.get(param.name())));
+ } else if (writeParameterType.isAssignableFrom(Long.class) ||
+ writeParameterType.isAssignableFrom(Long.TYPE)) {
+ writeMethod.invoke(instance, Long.parseLong(params.get(param.name())));
+ } else if (writeParameterType.isAssignableFrom(Boolean.class) ||
+ writeParameterType.isAssignableFrom(Boolean.TYPE)) {
+ writeMethod.invoke(instance, Boolean.parseBoolean(params.get(param.name())));
+ } else if (TaskParamType.class.isAssignableFrom(writeParameterType)) {
+ Constructor<?> ctor = writeParameterType.getConstructor();
Object obj = ctor.newInstance();
((TaskParamType)obj).deserialize(params.get(param.name()));
- classField.set(instance, obj);
+ writeMethod.invoke(instance, obj);
}
}
}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
index 93ec010..0c94839 100644
--- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -30,7 +30,22 @@ public class ExampleBlockingTask extends BlockingTask {
@Override
public TaskResult runBlockingCode() {
- logger.info("Running example blocking task {}", getTaskId());
+ logger.info("Starting task {}", getTaskId());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (getTaskId().startsWith("bt1")) {
+ try {
+ logger.info("Task {} is sleeping", getTaskId());
+ Thread.sleep(10000);
+ //return new TaskResult(TaskResult.Status.FAILED, "Fail");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("Ending task {}", getTaskId());
return new TaskResult(TaskResult.Status.COMPLETED, "Success");
}
}
diff --git a/pom.xml b/pom.xml
index 9489f35..6473d58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,7 @@
<spring-security.version>5.3.4.RELEASE</spring-security.version>
<yaml.version>1.15</yaml.version>
<spring.boot.version>2.2.1.RELEASE</spring.boot.version>
-
+ <commons.beanutils.version>1.9.4</commons.beanutils.version>
</properties>
</project>