You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:41 UTC
[airavata-data-lake] 34/42: Workflow engine initial implementation
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit a49c3d4bb383682616d734b8a23c25c60fbf6260
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu May 20 15:42:06 2021 -0400
Workflow engine initial implementation
---
.gitignore | 3 +
data-orchestrator/pom.xml | 1 +
data-orchestrator/workflow-engine/README.md | 13 ++
data-orchestrator/workflow-engine/pom.xml | 72 ++++++
.../workflow/engine/monitor/AsyncEventMonitor.java | 21 ++
.../engine/services/controller/Controller.java | 94 ++++++++
.../engine/services/participant/Participant.java | 251 +++++++++++++++++++++
.../services/wm/DataSyncWorkflowManager.java | 73 ++++++
.../engine/services/wm/WorkflowOperator.java | 182 +++++++++++++++
.../workflow/engine/task/AbstractTask.java | 158 +++++++++++++
.../workflow/engine/task/BlockingTask.java | 42 ++++
.../workflow/engine/task/NonBlockingTask.java | 36 +++
.../orchestrator/workflow/engine/task/OutPort.java | 31 +++
.../workflow/engine/task/TaskParamType.java | 23 ++
.../engine/task/annotation/BlockingTaskDef.java | 29 +++
.../engine/task/annotation/NonBlockingSection.java | 29 +++
.../engine/task/annotation/NonBlockingTaskDef.java | 29 +++
.../engine/task/annotation/TaskOutPort.java | 29 +++
.../workflow/engine/task/annotation/TaskParam.java | 31 +++
.../engine/task/impl/ExampleBlockingTask.java | 36 +++
.../engine/task/impl/ExampleNonBlockingTask.java | 28 +++
.../src/main/resources/application.properties | 27 +++
.../workflow-engine/src/main/resources/logback.xml | 50 ++++
.../src/main/resources/task-list.yaml | 5 +
pom.xml | 2 +
25 files changed, 1295 insertions(+)
diff --git a/.gitignore b/.gitignore
index ab08b32..3583487 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,6 @@ data-orchestrator/data-orchestrator-api/target
/metadata-service/db-service/client/client.iml
/metadata-service/db-service/server/server.iml
/metadata-service/db-service/stub/stub.iml
+/metadata-service/data-builders/data-builders.iml
+/metadata-service/db-service/db-service.iml
+/data-orchestrator/workflow-engine/workflow-engine.iml
\ No newline at end of file
diff --git a/data-orchestrator/pom.xml b/data-orchestrator/pom.xml
index 2d3ee54..ee7f550 100644
--- a/data-orchestrator/pom.xml
+++ b/data-orchestrator/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>data-orchestrator-api</module>
<module>data-orchestrator-core</module>
+ <module>workflow-engine</module>
</modules>
diff --git a/data-orchestrator/workflow-engine/README.md b/data-orchestrator/workflow-engine/README.md
new file mode 100644
index 0000000..0efb458
--- /dev/null
+++ b/data-orchestrator/workflow-engine/README.md
@@ -0,0 +1,13 @@
+### Service Execution Order
+
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.DataSyncWorkflowManager
+
+### Configure the participant with new tasks
+
+* Extend the task class by BlockingTask or NonBlockingTask class
+* Implement methods
+* Annotate the class with @BlockingTaskDef or @NonBlockingTaskDef annotations. See ExampleBlockingTask and ExampleNonBlockingTask
+* Register task in src/main/resources/task-list.yaml
+
diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
new file mode 100644
index 0000000..ef96c4f
--- /dev/null
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>data-orchestrator</artifactId>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <version>0.01-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>workflow-engine</artifactId>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>1.0.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <version>${spring.boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j.over.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>${yaml.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
new file mode 100644
index 0000000..4afa566
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
+
+public class AsyncEventMonitor {
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
new file mode 100644
index 0000000..83e6af5
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.util.concurrent.CountDownLatch;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller")
+public class Controller implements CommandLineRunner {
+
+ private final static Logger logger = LoggerFactory.getLogger(Controller.class);
+
+ @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+ private String clusterName;
+
+ @org.springframework.beans.factory.annotation.Value("${controller.name}")
+ private String controllerName;
+
+ @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+ private String zkAddress;
+
+ private org.apache.helix.HelixManager zkHelixManager;
+
+ private CountDownLatch startLatch = new CountDownLatch(1);
+ private CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void run(String... args) throws Exception {
+ logger.info("Starting Cluster Controller ......");
+
+ try {
+ ZkClient zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+ // Creates the zk cluster if not available
+ if (!zkHelixAdmin.getClusters().contains(clusterName)) {
+ zkHelixAdmin.addCluster(clusterName, true);
+ }
+
+ zkHelixAdmin.close();
+ zkClient.close();
+
+ logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName);
+ logger.info("Zookeeper connection string " + zkAddress);
+
+ zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+ controllerName, HelixControllerMain.STANDALONE);
+ startLatch.countDown();
+ stopLatch.await();
+ } catch (Exception ex) {
+ logger.error("Error in running the Controller: {}", controllerName, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Controller: {}, has disconnected from cluster: {}", controllerName, clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+ SpringApplication.run(Controller.class);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
new file mode 100644
index 0000000..a4b9bdd
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.*;
+import java.util.*;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant")
+public class Participant implements CommandLineRunner {
+
+ private final static Logger logger = LoggerFactory.getLogger(Participant.class);
+
+ private int shutdownGracePeriod = 30000;
+ private int shutdownGraceRetries = 2;
+
+ @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+ private String zkAddress;
+
+ @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+ private String clusterName;
+
+ @org.springframework.beans.factory.annotation.Value("${participant.name}")
+ private String participantName;
+
+ @org.springframework.beans.factory.annotation.Value("${task.list.file}")
+ private String taskListFile;
+
+ private ZKHelixManager zkHelixManager;
+
+ private List<String> blockingTaskClasses = new ArrayList<>();
+ private List<String> nonBlockingTaskClasses = new ArrayList<>();
+
+ private final List<String> runningTasks = Collections.synchronizedList(new ArrayList<String>());
+
+ @Override
+ public void run(String... args) throws Exception {
+ logger.info("Staring Participant .....");
+
+ loadTasks();
+
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+ List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+ if (!nodesInCluster.contains(participantName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(participantName);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setInstanceEnabled(true);
+ zkHelixAdmin.addInstance(clusterName, instanceConfig);
+ logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName);
+
+ } else {
+ zkHelixAdmin.enableInstance(clusterName, participantName, true);
+ logger.debug("Participant: " + participantName + " has been re-enabled at the cluster: " + clusterName);
+ }
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(() -> {
+ logger.debug("Participant: " + participantName + " shutdown hook called");
+ try {
+ zkHelixAdmin.enableInstance(clusterName, participantName, false);
+ } catch (Exception e) {
+ logger.warn("Participant: " + participantName + " was not disabled normally", e);
+ }
+ disconnect();
+ })
+ );
+
+ connect();
+
+ } catch (Exception ex) {
+ logger.error("Error running Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+ private void loadTasks() throws Exception {
+
+ try {
+ Yaml yaml = new Yaml();
+ File listFile = new File(taskListFile);
+
+ InputStream stream;
+ if (listFile.exists()) {
+ logger.info("Loading task list file {} from absolute path", taskListFile);
+ stream = new FileInputStream(taskListFile);
+ } else {
+ logger.info("Loading task list file {} from class path", taskListFile);
+ stream = Participant.class.getClassLoader().getResourceAsStream(taskListFile);
+ }
+
+ Object load = yaml.load(stream);
+
+ if (load == null) {
+ throw new Exception("Did not load the configuration from file " + taskListFile);
+ }
+
+ if (load instanceof Map) {
+ Map rootMap = (Map) load;
+ if (rootMap.containsKey("tasks")) {
+ Object tasksObj = rootMap.get("tasks");
+ if (tasksObj instanceof Map) {
+ Map tasksMap = (Map) tasksObj;
+ if (tasksMap.containsKey("blocking")) {
+ Object blockingTaskObj = tasksMap.get("blocking");
+ if (blockingTaskObj instanceof List) {
+ blockingTaskClasses = (List<String>) blockingTaskObj;
+ blockingTaskClasses.forEach(taskClz -> {
+ logger.info("Loading blocking task " + taskClz);
+ });
+ }
+ }
+
+ if (tasksMap.containsKey("nonBlocking")) {
+ Object nonBlockingTaskObj = tasksMap.get("nonBlocking");
+ if (nonBlockingTaskObj instanceof List) {
+ nonBlockingTaskClasses = (List<String>) nonBlockingTaskObj;
+ nonBlockingTaskClasses.forEach(taskClz -> {
+ logger.info("Loading non blocking task " + taskClz);
+ });
+ }
+ }
+ }
+ }
+ }
+ } catch (FileNotFoundException e) {
+ logger.error("Failed to load task list from file {}", taskListFile, e);
+ throw e;
+ }
+ }
+
+ private Map<String, TaskFactory> getTaskFactory() throws Exception {
+
+ Map<String, TaskFactory> taskMap = new HashMap<>();
+
+ for (String className : blockingTaskClasses) {
+ try {
+ logger.info("Loading blocking task {}", className);
+ Class<?> taskClz = Class.forName(className);
+ Object taskObj = taskClz.getConstructor().newInstance();
+ BlockingTask blockingTask = (BlockingTask) taskObj;
+ TaskFactory taskFactory = context -> {
+ blockingTask.setCallbackContext(context);
+ return blockingTask;
+ };
+ BlockingTaskDef btDef = blockingTask.getClass().getAnnotation(BlockingTaskDef.class);
+ taskMap.put(btDef.name(), taskFactory);
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Couldn't find a class with name {}", className);
+ throw e;
+ }
+ }
+ return taskMap;
+ }
+
+ private void connect() {
+ try {
+ zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+ // register online-offline model
+ StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+ OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+ machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+ // register task model
+ machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+
+ logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+ zkHelixManager.connect();
+ logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+ Thread.currentThread().join();
+ } catch (InterruptedException ex) {
+ logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ } catch (Exception ex) {
+ logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ private void disconnect() {
+ logger.info("Shutting down participant. Currently available tasks " + runningTasks.size());
+ if (zkHelixManager != null) {
+ if (runningTasks.size() > 0) {
+ for (int i = 0; i <= shutdownGraceRetries; i++) {
+ logger.info("Shutting down gracefully [RETRY " + i + "]");
+ try {
+ Thread.sleep(shutdownGracePeriod);
+ } catch (InterruptedException e) {
+ logger.warn("Waiting for running tasks failed [RETRY " + i + "]", e);
+ }
+ if (runningTasks.size() == 0) {
+ break;
+ }
+ }
+ }
+ logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+ SpringApplication.run(Participant.class);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
new file mode 100644
index 0000000..1e987a6
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+@SpringBootApplication()
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm")
+public class DataSyncWorkflowManager implements CommandLineRunner {
+
+ private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
+
+ @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+ private String clusterName;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
+ private String workflowManagerName;
+
+ @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+ private String zkAddress;
+
+ @Override
+ public void run(String... args) throws Exception {
+ WorkflowOperator workflowOperator = new WorkflowOperator();
+ workflowOperator.init(clusterName, workflowManagerName, zkAddress);
+
+ ExampleBlockingTask bt1 = new ExampleBlockingTask();
+ bt1.setTaskId("bt1-" + UUID.randomUUID());
+
+ ExampleBlockingTask bt2 = new ExampleBlockingTask();
+ bt2.setTaskId("bt2-" + UUID.randomUUID());
+
+ // Setting dependency
+ bt1.setOutPort(new OutPort().setNextTaskId(bt2.getTaskId()));
+
+ Map<String, AbstractTask> taskMap = new HashMap<>();
+ taskMap.put(bt1.getTaskId(), bt1);
+ taskMap.put(bt2.getTaskId(), bt2);
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, bt1.getTaskId());
+ logger.info("Launched workflow {}", workflowId);
+ }
+
+ public static void main(String args[]) throws Exception {
+ SpringApplication.run(DataSyncWorkflowManager.class);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
new file mode 100644
index 0000000..8590eb5
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+public class WorkflowOperator {
+
+ private static final long WORKFLOW_EXPIRY_TIME = 1 * 1000;
+ private static final long TASK_EXPIRY_TIME = 24 * 60 * 60 * 1000;
+
+ private TaskDriver taskDriver;
+ private HelixManager helixManager;
+
+ public void init(String clusterName, String workflowManagerName, String zkAddress) throws Exception {
+ helixManager = HelixManagerFactory.getZKHelixManager(clusterName, workflowManagerName,
+ InstanceType.SPECTATOR, zkAddress);
+ helixManager.connect();
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ if (helixManager != null && helixManager.isConnected()) {
+ helixManager.disconnect();
+ }
+ }
+ }
+ );
+
+ taskDriver = new TaskDriver(helixManager);
+ }
+
+ public void destroy() {
+ if (helixManager != null) {
+ helixManager.disconnect();
+ }
+ }
+
+ public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String startTaskId) throws Exception {
+
+ if (taskDriver == null) {
+ throw new Exception("Workflow operator needs to be initialized");
+ }
+
+ String workflowName = UUID.randomUUID().toString();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
+ buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+
+ WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+ workflowBuilder.setWorkflowConfig(config.build());
+ workflowBuilder.setExpiry(WORKFLOW_EXPIRY_TIME);
+ Workflow workflow = workflowBuilder.build();
+
+ taskDriver.start(workflow);
+ return workflowName;
+ }
+
+ private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
+ throws Exception{
+ AbstractTask currentTask = taskMap.get(nextTaskId);
+ String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name();
+ TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
+ .setTaskId(currentTask.getTaskId())
+ .setCommand(taskType);
+
+ Map<String, String> paramMap = serializeTaskData(currentTask);
+ paramMap.forEach(taskBuilder::addConfig);
+
+ List<TaskConfig> taskBuilds = new ArrayList<>();
+ taskBuilds.add(taskBuilder.build());
+
+ JobConfig.Builder job = new JobConfig.Builder()
+ .addTaskConfigs(taskBuilds)
+ .setFailureThreshold(0)
+ .setExpiry(WORKFLOW_EXPIRY_TIME)
+ .setTimeoutPerTask(TASK_EXPIRY_TIME)
+ .setNumConcurrentTasksPerInstance(20)
+ .setMaxAttemptsPerTask(currentTask.getRetryCount());
+
+ workflowBuilder.addJob(currentTask.getTaskId(), job);
+
+ List<OutPort> outPorts = getOutPortsOfTask(currentTask);
+
+ for (OutPort outPort : outPorts) {
+ if (outPort != null) {
+ workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
+ buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+ }
+ }
+ }
+
+ public String getWorkflowStatus(String workflowName) {
+ WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowName);
+ TaskState workflowState = workflowContext.getWorkflowState();
+ return workflowState.name();
+ }
+
+ public void stopWorkflow(String workflowName) {
+ taskDriver.stop(workflowName);
+ }
+
+ public void resumeWorkflow(String workflowName) {
+ taskDriver.resume(workflowName);
+ }
+
+ public void deleteWorkflow(String workflowName) {
+ taskDriver.delete(workflowName);
+ }
+
+ private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+
+ Map<String, String> result = new HashMap<>();
+ for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field classField : fields) {
+ TaskParam parm = classField.getAnnotation(TaskParam.class);
+ if (parm != null) {
+ classField.setAccessible(true);
+ if (classField.get(data) instanceof TaskParamType) {
+ result.put(parm.name(), TaskParamType.class.cast(classField.get(data)).serialize());
+ } else {
+ result.put(parm.name(), classField.get(data).toString());
+ }
+ }
+
+ TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+ if (outPort != null) {
+ classField.setAccessible(true);
+ if (classField.get(data) != null) {
+ result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ private <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
+
+ List<OutPort> outPorts = new ArrayList<>();
+ for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field field : fields) {
+ TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+ if (outPortAnnotation != null) {
+ field.setAccessible(true);
+ OutPort outPort = (OutPort) field.get(taskObj);
+ outPorts.add(outPort);
+ }
+ }
+ }
+ return outPorts;
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
new file mode 100644
index 0000000..c9ceee9
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+ private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
+
+ private TaskCallbackContext callbackContext;
+
+ @TaskOutPort(name = "nextTask")
+ private OutPort outPort;
+
+ @TaskParam(name = "taskId")
+ private String taskId;
+
+ @TaskParam(name = "retryCount")
+ private int retryCount = 3;
+
+ public AbstractTask() {
+
+ }
+
+ @Override
+ public TaskResult run() {
+ try {
+ String helixTaskId = this.callbackContext.getTaskConfig().getId();
+ logger.info("Running task {}", helixTaskId);
+ deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+ } catch (Exception e) {
+ logger.error("Failed at deserializing task data", e);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data");
+ }
+ return onRun();
+ }
+
+ @Override
+ public void cancel() {
+ onCancel();
+ }
+
+ public abstract TaskResult onRun();
+
+ public abstract void onCancel();
+
+ public OutPort getOutPort() {
+ return outPort;
+ }
+
+ public void setOutPort(OutPort outPort) {
+ this.outPort = outPort;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public TaskCallbackContext getCallbackContext() {
+ return callbackContext;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public void setCallbackContext(TaskCallbackContext callbackContext) {
+ this.callbackContext = callbackContext;
+ }
+
+ private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
+
+ List<Field> allFields = new ArrayList<>();
+ Class genericClass = instance.getClass();
+
+ while (AbstractTask.class.isAssignableFrom(genericClass)) {
+ Field[] declaredFields = genericClass.getDeclaredFields();
+ for (Field declaredField : declaredFields) {
+ allFields.add(declaredField);
+ }
+ genericClass = genericClass.getSuperclass();
+ }
+
+ for (Field classField : allFields) {
+ TaskParam param = classField.getAnnotation(TaskParam.class);
+ if (param != null) {
+ if (params.containsKey(param.name())) {
+ classField.setAccessible(true);
+ if (classField.getType().isAssignableFrom(String.class)) {
+ classField.set(instance, params.get(param.name()));
+ } else if (classField.getType().isAssignableFrom(Integer.class) ||
+ classField.getType().isAssignableFrom(Integer.TYPE)) {
+ classField.set(instance, Integer.parseInt(params.get(param.name())));
+ } else if (classField.getType().isAssignableFrom(Long.class) ||
+ classField.getType().isAssignableFrom(Long.TYPE)) {
+ classField.set(instance, Long.parseLong(params.get(param.name())));
+ } else if (classField.getType().isAssignableFrom(Boolean.class) ||
+ classField.getType().isAssignableFrom(Boolean.TYPE)) {
+ classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
+ } else if (TaskParamType.class.isAssignableFrom(classField.getType())) {
+ Class<?> clazz = classField.getType();
+ Constructor<?> ctor = clazz.getConstructor();
+ Object obj = ctor.newInstance();
+ ((TaskParamType)obj).deserialize(params.get(param.name()));
+ classField.set(instance, obj);
+ }
+ }
+ }
+ }
+
+ for (Field classField : allFields) {
+ TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+ if (outPort != null) {
+ classField.setAccessible(true);
+ OutPort op = new OutPort();
+ op.setNextTaskId(params.get(outPort.name()));
+ }
+ }
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
new file mode 100644
index 0000000..9033f1a
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BlockingTask extends AbstractTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+
+ public BlockingTask() {
+ }
+
+ @Override
+ public TaskResult onRun() {
+ return runBlockingCode();
+ }
+
+ public abstract TaskResult runBlockingCode();
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
new file mode 100644
index 0000000..9d2532c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+import org.apache.helix.task.TaskResult;
+
+public class NonBlockingTask extends AbstractTask {
+
+ public NonBlockingTask() {
+ }
+
+ @Override
+ public TaskResult onRun() {
+ return null;
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
new file mode 100644
index 0000000..2b4bb09
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+public class OutPort {
+ private String nextTaskId;
+
+ public String getNextTaskId() {
+ return nextTaskId;
+ }
+
+ public OutPort setNextTaskId(String nextTaskId) {
+ this.nextTaskId = nextTaskId;
+ return this;
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
new file mode 100644
index 0000000..08f0fb0
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+
+public interface TaskParamType {
+ public String serialize();
+ public void deserialize(String content);
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
new file mode 100644
index 0000000..06735bc
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface BlockingTaskDef {
+ public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
new file mode 100644
index 0000000..8047a9b
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface NonBlockingSection {
+ public int order();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
new file mode 100644
index 0000000..d38dc30
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface NonBlockingTaskDef {
+ public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
new file mode 100644
index 0000000..ce8e9ae
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskOutPort {
+ public String name();
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
new file mode 100644
index 0000000..fe94273
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskParam {
+ public String name();
+ public String defaultValue() default "";
+ public boolean mandatory() default false;
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
new file mode 100644
index 0000000..93ec010
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@BlockingTaskDef(name = "ExampleBlockingTask")
+public class ExampleBlockingTask extends BlockingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(ExampleBlockingTask.class);
+
+ @Override
+ public TaskResult runBlockingCode() {
+ logger.info("Running example blocking task {}", getTaskId());
+ return new TaskResult(TaskResult.Status.COMPLETED, "Success");
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
new file mode 100644
index 0000000..527d0a2
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+
+@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
+public class ExampleNonBlockingTask extends NonBlockingTask {
+
+ public ExampleNonBlockingTask() {
+ }
+}
diff --git a/data-orchestrator/workflow-engine/src/main/resources/application.properties b/data-orchestrator/workflow-engine/src/main/resources/application.properties
new file mode 100644
index 0000000..ea188ec
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/application.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cluster.name=datalake
+controller.name=datalake_controller
+zookeeper.connection=localhost:2181
+
+participant.name=datalake_participant
+task.list.file=task-list.yaml
+
+datasync.wm.name=datasync_wf
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/resources/logback.xml b/data-orchestrator/workflow-engine/src/main/resources/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>../logs/airavata.log</File>
+ <Append>true</Append>
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="ch.qos.logback" level="WARN"/>
+ <logger name="org.apache.helix" level="WARN"/>
+ <logger name="org.apache.zookeeper" level="ERROR"/>
+ <logger name="org.apache.helix" level="ERROR"/>
+ <logger name="org.apache.airavata" level="INFO"/>
+ <logger name="org.hibernate" level="ERROR"/>
+ <logger name="net.schmizz.sshj" level="WARN"/>
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="LOGFILE"/>
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml
new file mode 100644
index 0000000..e9b17a4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml
@@ -0,0 +1,5 @@
+tasks:
+ blocking:
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
+ nonBlocking:
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 69205f7..9489f35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,8 @@
<neo4j.version>3.4.6</neo4j.version>
<io.grpc.version>1.25.0</io.grpc.version>
<spring-security.version>5.3.4.RELEASE</spring-security.version>
+ <yaml.version>1.15</yaml.version>
+ <spring.boot.version>2.2.1.RELEASE</spring.boot.version>
</properties>