You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:13:48 UTC
[airavata-sandbox] 04/19: Initial implementation of task execution
engine on top of helix
This is an automated email from the ASF dual-hosted git repository.
smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git
commit 23b761ec3813f6db12ad2d851924d40021017fd9
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Sun Nov 19 01:19:04 2017 +0530
Initial implementation of task execution engine on top of helix
---
.../k8s/compute/impl/SSHComputeOperations.java | 152 ++++++++++++-------
airavata-kubernetes/modules/helix-tasks/pom.xml | 89 +++++++++++
.../org/apache/airavata/helix/HelixCluster.java | 48 ++++++
.../org/apache/airavata/helix/HelixController.java | 85 +++++++++++
.../org/apache/airavata/helix/HelixManager.java | 14 ++
.../apache/airavata/helix/HelixParticipant.java | 143 ++++++++++++++++++
.../org/apache/airavata/helix/WorkflowManager.java | 86 +++++++++++
.../apache/airavata/helix/tasks/AbstractTask.java | 139 ++++++++++++++++++
.../airavata/helix/tasks/DataCollectingTask.java | 32 ++++
.../airavata/helix/tasks/DataPushingTask.java | 39 +++++
.../airavata/helix/tasks/command/CommandTask.java | 137 +++++++++++++++++
.../airavata/helix/tasks/command/Participant.java | 55 +++++++
.../airavata/helix/tasks/datain/DataInputTask.java | 123 ++++++++++++++++
.../airavata/helix/tasks/datain/Participant.java | 56 +++++++
.../helix/tasks/dataout/DataOutputTask.java | 128 ++++++++++++++++
.../airavata/helix/tasks/dataout/Participant.java | 54 +++++++
.../src/main/resources/log4j.properties | 9 ++
.../api/server/controller/DataStoreController.java | 9 +-
.../k8s/api/server/model/data/DataStoreModel.java | 11 +-
.../k8s/api/server/service/WorkflowService.java | 6 +-
.../api/server/service/data/DataStoreService.java | 7 +-
.../api/server/service/util/ToResourceUtil.java | 6 +-
.../modules/microservices/task-scheduler/pom.xml | 18 +++
.../k8s/gfac/core/HelixWorkflowManager.java | 162 +++++++++++++++++++++
.../airavata/k8s/gfac/messaging/KafkaReceiver.java | 12 +-
.../k8s/gfac/service/HelixWorkflowService.java | 17 +++
.../airavata/k8s/gfac/service/WorkerService.java | 31 +++-
.../src/main/resources/log4j.properties | 9 ++
airavata-kubernetes/pom.xml | 1 +
29 files changed, 1593 insertions(+), 85 deletions(-)
diff --git a/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java b/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
index 140c5b6..fb77981 100644
--- a/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
+++ b/airavata-kubernetes/modules/compute-resource-api/src/main/java/org/apache/airavata/k8s/compute/impl/SSHComputeOperations.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -51,46 +50,11 @@ public class SSHComputeOperations implements ComputeOperations {
this.port = port;
}
- public ExecutionResult executeCommand(String command) throws JSchException, IOException {
- JSch jsch = new JSch();
- Session session = jsch.getSession(userName, this.computeHost, port);
- session.setConfig("StrictHostKeyChecking", "no");
+ public ExecutionResult executeCommand(String command) throws Exception {
+ Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
- session.setUserInfo(new UserInfo() {
- @Override
- public String getPassphrase() {
- return password;
- }
-
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- public boolean promptPassword(String s) {
- return true;
- }
-
- @Override
- public boolean promptPassphrase(String s) {
- return false;
- }
-
- @Override
- public boolean promptYesNo(String s) {
- return false;
- }
-
- @Override
- public void showMessage(String s) {
-
- }
- });
-
- session.connect();
- Channel channel=session.openChannel("exec");
- ((ChannelExec)channel).setCommand(command);
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
channel.setOutputStream(sysOut);
@@ -104,9 +68,9 @@ public class SSHComputeOperations implements ComputeOperations {
ExecutionResult result = new ExecutionResult();
byte[] tmp = new byte[1024];
while (true) {
- while (in.available()>0) {
+ while (in.available() > 0) {
int i = in.read(tmp, 0, 1024);
- if (i<0) break;
+ if (i < 0) break;
System.out.print(new String(tmp, 0, i));
}
if (channel.isClosed()) {
@@ -117,7 +81,8 @@ public class SSHComputeOperations implements ComputeOperations {
}
try {
Thread.sleep(1000);
- } catch(Exception e){}
+ } catch (Exception e) {
+ }
}
channel.disconnect();
@@ -128,16 +93,27 @@ public class SSHComputeOperations implements ComputeOperations {
return result;
}
- public void transferDataIn(String source, String target, String protocol) {
-
+ public void transferDataIn(String source, String target, String protocol) throws Exception {
+ Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
+ copyLocalToRemote(session, source, target);
}
public void transferDataOut(String source, String target, String protocol) throws Exception {
+ Session session = getConnectedSession(this.userName, this.password, this.computeHost, this.port);
+ copyRemoteToLocal(session, source, target);
+ }
+
+ private static Session getConnectedSession(String userName, String password, String computeHost, int port) throws Exception {
JSch jsch = new JSch();
- Session session = jsch.getSession(userName, this.computeHost, port);
+ Session session = jsch.getSession(userName, computeHost, port);
session.setConfig("StrictHostKeyChecking", "no");
+ session.setUserInfo(getUserInfo(password));
+ session.connect();
+ return session;
+ }
- session.setUserInfo(new UserInfo() {
+ private static UserInfo getUserInfo(String password) {
+ return new UserInfo() {
@Override
public String getPassphrase() {
return password;
@@ -167,11 +143,79 @@ public class SSHComputeOperations implements ComputeOperations {
public void showMessage(String s) {
}
- });
+ };
+ }
- session.connect();
+ private static void copyLocalToRemote(Session session, String source, String target) throws Exception {
- copyRemoteToLocal(session, source, target);
+ FileInputStream fis = null;
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + target;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ return;
+ }
+
+ File _lfile = new File(source);
+
+ if (ptimestamp) {
+ command = "T " + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ return;
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (source.lastIndexOf('/') > 0) {
+ command += source.substring(source.lastIndexOf('/') + 1);
+ } else {
+ command += source;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ return;
+ }
+
+ // send a content of lfile
+ fis = new FileInputStream(source);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ return;
+ }
+ out.close();
+
+ channel.disconnect();
+ session.disconnect();
}
private static void copyRemoteToLocal(Session session, String source, String target) throws JSchException, IOException {
@@ -246,7 +290,7 @@ public class SSHComputeOperations implements ComputeOperations {
}
if (checkAck(in) != 0) {
- System.exit(0);
+ return;
}
// send '\0'
diff --git a/airavata-kubernetes/modules/helix-tasks/pom.xml b/airavata-kubernetes/modules/helix-tasks/pom.xml
new file mode 100644
index 0000000..568c922
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>helix-tasks</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ <version>3.0.2.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>api-resource</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>compute-resource-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.53</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
new file mode 100644
index 0000000..2b96328
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.helix;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixCluster {
+
+ private static final Logger logger = LogManager.getLogger(HelixCluster.class);
+
+ private String zkAddress;
+ private String clusterName;
+ private int numPartitions;
+
+ private ZkClient zkClient;
+ private ZKHelixAdmin zkHelixAdmin;
+
+ public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ this.numPartitions = numPartitions;
+
+ zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ zkHelixAdmin = new ZKHelixAdmin(zkClient);
+ }
+
+ public void setup() {
+ zkHelixAdmin.addCluster(clusterName, true);
+ zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
+ logger.info("Cluster: " + clusterName + ", has been added.");
+ }
+
+ public void disconnect() {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
new file mode 100644
index 0000000..8237d43
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
@@ -0,0 +1,85 @@
+package org.apache.airavata.helix;
+
+import org.apache.helix.*;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixController implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(HelixController.class);
+
+ private String clusterName;
+ private String controllerName;
+ private String zkAddress;
+ private org.apache.helix.HelixManager zkHelixManager;
+
+ private CountDownLatch startLatch = new CountDownLatch(1);
+ private CountDownLatch stopLatch = new CountDownLatch(1);
+
+ public HelixController(String zkAddress, String clusterName, String controllerName) {
+ this.clusterName = clusterName;
+ this.controllerName = controllerName;
+ this.zkAddress = zkAddress;
+ }
+
+ public void run() {
+ try {
+ zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+ controllerName, HelixControllerMain.STANDALONE);
+ startLatch.countDown();
+ stopLatch.await();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+
+ }
+
+ public void start() {
+ new Thread(this).start();
+ try {
+ startLatch.await();
+ logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ disconnect();
+ }
+ }
+ );
+
+ } catch (InterruptedException ex) {
+ logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+ }
+
+ }
+
+ public void stop() {
+ stopLatch.countDown();
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+
+ public static void main(String args[]) {
+ HelixController helixController = new HelixController("localhost:2199", "AiravataDemoCluster", "AiravataController");
+ helixController.start();
+ }
+
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
new file mode 100644
index 0000000..e09f307
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
@@ -0,0 +1,14 @@
+package org.apache.airavata.helix;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixManager {
+ public static void main(String args[]) {
+ HelixCluster helixCluster = new HelixCluster("localhost:2199", "AiravataDemoCluster", 1);
+ helixCluster.setup();
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
new file mode 100644
index 0000000..cbbc300
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
@@ -0,0 +1,143 @@
+package org.apache.airavata.helix;
+
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class HelixParticipant implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(HelixParticipant.class);
+
+ private String zkAddress;
+ private String clusterName;
+ private String participantName;
+ private ZKHelixManager zkHelixManager;
+ private String taskTypeName;
+ private String apiServerUrl;
+ private RestTemplate restTemplate;
+
+ public HelixParticipant(String zkAddress,
+ String clusterName,
+ String participantName,
+ String taskTypeName,
+ String apiServerUrl) {
+
+ logger.debug("Initializing Participant Node");
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ this.participantName = participantName;
+ this.taskTypeName = taskTypeName;
+ this.apiServerUrl = apiServerUrl;
+ this.restTemplate = new RestTemplate();
+ }
+
+ public abstract Map<String, TaskFactory> getTaskFactory();
+
+ public abstract TaskTypeResource getTaskType();
+
+ public void run() {
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+ List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+ if (!nodesInCluster.contains(participantName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(participantName);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.addTag(taskTypeName);
+ zkHelixAdmin.addInstance(clusterName, instanceConfig);
+ logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+ } else {
+ zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName);
+ }
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ logger.debug("Participant: " + participantName + ", shutdown hook called.");
+ disconnect();
+ }
+ }
+ );
+
+ // connect the participant manager
+ register();
+ connect();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+ private void register() {
+ this.restTemplate.postForObject("http://" + apiServerUrl + "/taskType", getTaskType(), Long.class);
+ }
+
+ private void connect() {
+ try {
+ zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+ // register online-offline model
+ StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+ OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+ machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+ // register task model
+ machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+ logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+ zkHelixManager.connect();
+ logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+ Thread.currentThread().join();
+ } catch (InterruptedException ex) {
+ logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ }
+ catch (Exception ex) {
+ logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
new file mode 100644
index 0000000..6866af3
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
@@ -0,0 +1,86 @@
+package org.apache.airavata.helix;
+
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.helix.*;
+import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class WorkflowManager {
+
+ private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
+
+
+ public static void main(String args[]) {
+ Workflow workflow = createWorkflow().build();
+
+ org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
+ InstanceType.SPECTATOR, "localhost:2199");
+
+ try {
+ helixManager.connect();
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ helixManager.disconnect();
+ }
+ }
+ );
+
+ taskDriver.start(workflow);
+ logger.info("Started workflow");
+ TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+ System.out.println("Task state " + taskState.name());
+
+ } catch (Exception ex) {
+ logger.error("Error in connect() for Admin, reason: " + ex, ex);
+ }
+ }
+
+ private static Workflow.Builder createWorkflow() {
+ List<TaskConfig> downloadDataTasks = new ArrayList<>();
+ downloadDataTasks.add(new TaskConfig.Builder().setTaskId("Download_Task").setCommand(DataCollectingTask.NAME).build());
+
+ List<TaskConfig> commandExecuteTasks = new ArrayList<>();
+ commandExecuteTasks.add(new TaskConfig.Builder().setTaskId("Command_Task").setCommand(CommandTask.NAME).build());
+
+ List<TaskConfig> pushDataTasks = new ArrayList<>();
+ pushDataTasks.add(new TaskConfig.Builder().setTaskId("Push_Task").setCommand(DataPushingTask.NAME).build());
+
+ JobConfig.Builder downloadDataJob = new JobConfig.Builder()
+ .addTaskConfigs(downloadDataTasks)
+ .setMaxAttemptsPerTask(3).setInstanceGroupTag("p1");
+
+ JobConfig.Builder commandExecuteJob = new JobConfig.Builder()
+ .addTaskConfigs(commandExecuteTasks)
+ .setMaxAttemptsPerTask(3).setInstanceGroupTag("p2");
+
+ JobConfig.Builder dataPushJob = new JobConfig.Builder()
+ .addTaskConfigs(pushDataTasks)
+ .setMaxAttemptsPerTask(3).setInstanceGroupTag("p3");
+
+ Workflow.Builder workflow = new Workflow.Builder("Airavata_Workflow3").setExpiry(0);
+ workflow.addJob("downloadDataJob", downloadDataJob);
+ workflow.addJob("commandExecuteJob", commandExecuteJob);
+ workflow.addJob("dataPushJob", dataPushJob);
+
+ workflow.addParentChildDependency("downloadDataJob", "commandExecuteJob");
+ workflow.addParentChildDependency("downloadDataJob", "dataPushJob");
+
+ return workflow;
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
new file mode 100644
index 0000000..8e581ea
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
@@ -0,0 +1,139 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.airavata.k8s.compute.api.ComputeOperations;
+import org.apache.airavata.k8s.compute.impl.MockComputeOperation;
+import org.apache.airavata.k8s.compute.impl.SSHComputeOperations;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+ public static final String NEXT_JOB = "next-job";
+ public static final String WORKFLOW_STARTED = "workflow-started";
+ public static final String TASK_ID = "task_id";
+ public static final String PROCESS_ID = "process_id";
+
+ //Configurable values
+ private String apiServerUrl = "localhost:8080";
+ private String kafkaBootstrapUrl = "localhost:9092";
+ private String eventTopic = "airavata-task-event";
+
+ private TaskCallbackContext callbackContext;
+ private RestTemplate restTemplate;
+ private Producer<String, String> eventProducer;
+ private long processId;
+ private long taskId;
+
+ public AbstractTask(TaskCallbackContext callbackContext) {
+ this.callbackContext = callbackContext;
+ this.taskId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(TASK_ID));
+ this.processId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(PROCESS_ID));
+ this.restTemplate = new RestTemplate();
+ initializeKafkaEventProducer();
+ init();
+ }
+
+ public TaskCallbackContext getCallbackContext() {
+ return callbackContext;
+ }
+
+ @Override
+ public final TaskResult run() {
+ boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
+ this.callbackContext.getJobConfig().getJobId()
+ .equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
+ if (isThisNextJob) {
+ return onRun();
+ } else {
+ return new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
+ }
+ }
+
+ @Override
+ public final void cancel() {
+ onCancel();
+ }
+
+ public void init() {
+
+ }
+
+ public abstract TaskResult onRun();
+
+ public abstract void onCancel();
+
+ public void sendToOutPort(String port) {
+ putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
+ String outJob = getCallbackContext().getTaskConfig().getConfigMap().get("OUT_" + port);
+ if (outJob != null) {
+ putUserContent(NEXT_JOB, outJob, Scope.WORKFLOW);
+ }
+ }
+
+ public RestTemplate getRestTemplate() {
+ return restTemplate;
+ }
+
+ public String getApiServerUrl() {
+ return apiServerUrl;
+ }
+
+ public ComputeOperations fetchComputeResourceOperation(ComputeResource computeResource) throws Exception {
+ ComputeOperations operations;
+ if ("SSH".equals(computeResource.getCommunicationType())) {
+ operations = new SSHComputeOperations(computeResource.getHost(), computeResource.getUserName(), computeResource.getPassword());
+ } else if ("Mock".equals(computeResource.getCommunicationType())) {
+ operations = new MockComputeOperation(computeResource.getHost());
+ } else {
+ throw new Exception("No compatible communication method {" + computeResource.getCommunicationType() + "} not found for compute resource " + computeResource.getName());
+ }
+ return operations;
+ }
+
+ public void initializeKafkaEventProducer() {
+ Properties props = new Properties();
+
+ props.put("bootstrap.servers", this.kafkaBootstrapUrl);
+
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ eventProducer = new KafkaProducer<String, String>(props);
+ }
+
+ public void publishTaskStatus(long status, String reason) {
+ eventProducer.send(new ProducerRecord<String, String>(
+ this.eventTopic, String.join(",", this.processId + "", this.taskId + "", status + "", reason)));
+ }
+
+ public long getProcessId() {
+ return processId;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
new file mode 100644
index 0000000..594ce35
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
@@ -0,0 +1,32 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataCollectingTask extends UserContentStore implements Task {
+
+ public static final String NAME = "DATA_COLLECTING";
+
+ public DataCollectingTask(TaskCallbackContext callbackContext) {
+ }
+
+ public TaskResult run() {
+ System.out.println("Executing data collecting");
+ putUserContent("Key", "Hooo", Scope.WORKFLOW);
+
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+
+ }
+
+ public void cancel() {
+
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
new file mode 100644
index 0000000..fc7ee8b
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
@@ -0,0 +1,39 @@
+package org.apache.airavata.helix.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataPushingTask extends UserContentStore implements Task {
+
+ public static final String NAME = "DATA_PUSHING";
+
+ public DataPushingTask(TaskCallbackContext callbackContext) {
+ }
+
+ public TaskResult run() {
+ System.out.println("Executing data pushing");
+ try {
+ Thread.currentThread().sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("Continuing");
+ String key2 = getUserContent("Key2", Scope.WORKFLOW);
+
+ System.out.println(key2);
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+
+ }
+
+ public void cancel() {
+
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
new file mode 100644
index 0000000..5aae5b7
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
@@ -0,0 +1,137 @@
+package org.apache.airavata.helix.tasks.command;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.airavata.k8s.compute.api.ExecutionResult;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+import java.util.Arrays;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class CommandTask extends AbstractTask {
+
+ public static final String NAME = "COMMAND";
+
+ private String command;
+ private String arguments;
+ private String stdOutPath;
+ private String stdErrPath;
+ private String computeResourceId;
+ private ComputeResource computeResource;
+
+ public CommandTask(TaskCallbackContext callbackContext) {
+ super(callbackContext);
+ }
+
+ @Override
+ public void init() {
+ this.command = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMMAND);
+ this.arguments = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.ARGUMENTS);
+ this.stdOutPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_OUT_PATH);
+ this.stdErrPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_ERR_PATH);
+ this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+ this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+ }
+
+ public TaskResult onRun() {
+ System.out.println("Executing command " + command);
+ try {
+
+ String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
+
+ publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+ String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
+
+ System.out.println("Executing command " + finalCommand);
+
+ ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
+
+ if (executionResult.getExitStatus() == 0) {
+ publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+ sendToOutPort("Out");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+ } else if (executionResult.getExitStatus() == -1) {
+ publishTaskStatus(TaskStatusResource.State.FAILED, "Process didn't exit successfully");
+ sendToOutPort("Error");
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+
+ } else {
+ publishTaskStatus(TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
+ sendToOutPort("Error");
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+ }
+
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+ }
+ }
+
+ public void onCancel() {
+
+ }
+
+ public static TaskTypeResource getTaskType() {
+ TaskTypeResource taskTypeResource = new TaskTypeResource();
+ taskTypeResource.setName(NAME);
+ taskTypeResource.setTopicName("airavata-command");
+ taskTypeResource.setIcon("assets/icons/ssh.png");
+ taskTypeResource.getInputTypes().addAll(
+ Arrays.asList(
+ new TaskInputTypeResource()
+ .setName(PARAMS.COMMAND)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.ARGUMENTS)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.COMPUTE_RESOURCE)
+ .setType("Long")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.STD_OUT_PATH)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.STD_ERR_PATH)
+ .setType("String")
+ .setDefaultValue("")));
+
+ taskTypeResource.getOutPorts().addAll(
+ Arrays.asList(
+ new TaskOutPortTypeResource()
+ .setName("Out")
+ .setOrder(0),
+ new TaskOutPortTypeResource()
+ .setName("Error")
+ .setOrder(1))
+ );
+
+ return taskTypeResource;
+ }
+
+ public static final class PARAMS {
+ public static final String COMMAND = "command";
+ public static final String ARGUMENTS = "arguments";
+ public static final String STD_OUT_PATH = "std_out_path";
+ public static final String STD_ERR_PATH = "std_err_path";
+ public static final String COMPUTE_RESOURCE = "compute_resource";
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
new file mode 100644
index 0000000..f852d8f
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
@@ -0,0 +1,55 @@
+package org.apache.airavata.helix.tasks.command;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.helix.tasks.DataCollectingTask;
+import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+ public Participant(String zkAddress, String clusterName, String participantName, String taskTypeName, String apiServerUrl) {
+ super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+ }
+
+ @Override
+ public Map<String, TaskFactory> getTaskFactory() {
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+ TaskFactory commandTaskFac = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new CommandTask(context);
+ }
+ };
+
+ taskRegistry.put(CommandTask.NAME, commandTaskFac);
+
+ return taskRegistry;
+ }
+
+ @Override
+ public TaskTypeResource getTaskType() {
+ return CommandTask.getTaskType();
+ }
+
+ public static void main(String args[]) {
+ HelixParticipant participant = new Participant(
+ "localhost:2199",
+ "AiravataDemoCluster",
+ "command-p1", CommandTask.NAME,
+ "localhost:8080");
+ new Thread(participant).start();
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
new file mode 100644
index 0000000..41d6aa4
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
@@ -0,0 +1,123 @@
+package org.apache.airavata.helix.tasks.datain;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataInputTask extends AbstractTask {
+
+ public static final String NAME = "DATA_INPUT";
+
+ private String remoteSourcePath;
+ private String targetPath;
+ private String computeResourceId;
+ private ComputeResource computeResource;
+
+ public DataInputTask(TaskCallbackContext callbackContext) {
+ super(callbackContext);
+ }
+
+ @Override
+ public void init() {
+ this.remoteSourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.REMOTE_SOURCE_PATH);
+ this.targetPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.TARGET_PATH);
+ this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+ this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+ }
+
+ @Override
+ public TaskResult onRun() {
+ try {
+ String tempFilePath = "/tmp/" + UUID.randomUUID().toString();
+ System.out.println("Creating tmp file " + tempFilePath);
+ publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+ if (remoteSourcePath.startsWith("http")) {
+ System.out.println("Downloading text file " + remoteSourcePath);
+ FileUtils.copyURLToFile(new URL(remoteSourcePath), new File(tempFilePath));
+
+ } else {
+ publishTaskStatus(TaskStatusResource.State.FAILED, "Unsupported source type");
+ sendToOutPort("Error");
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+
+ }
+
+ System.out.println("Transferring file to remote resource");
+ fetchComputeResourceOperation(computeResource).transferDataIn(tempFilePath, this.targetPath, "SCP");
+
+ publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+ sendToOutPort("Out");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+ sendToOutPort("Error");
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+
+ public static TaskTypeResource getTaskType() {
+ TaskTypeResource taskTypeResource = new TaskTypeResource();
+ taskTypeResource.setName(NAME);
+ taskTypeResource.setTopicName("airavata-data-collect");
+ taskTypeResource.setIcon("assets/icons/copy.png");
+ taskTypeResource.getInputTypes().addAll(
+ Arrays.asList(
+ new TaskInputTypeResource()
+ .setName(PARAMS.REMOTE_SOURCE_PATH)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.TARGET_PATH)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.COMPUTE_RESOURCE)
+ .setType("Long")
+ .setDefaultValue("")));
+
+ taskTypeResource.getOutPorts().addAll(
+ Arrays.asList(
+ new TaskOutPortTypeResource()
+ .setName("Out")
+ .setOrder(0),
+ new TaskOutPortTypeResource()
+ .setName("Error")
+ .setOrder(1))
+ );
+
+ return taskTypeResource;
+ }
+
+ public static final class PARAMS {
+ public static final String REMOTE_SOURCE_PATH = "remote_source_path";
+ public static final String TARGET_PATH = "target_path";
+ public static final String COMPUTE_RESOURCE = "compute_resource";
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
new file mode 100644
index 0000000..f06f56b
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
@@ -0,0 +1,56 @@
+package org.apache.airavata.helix.tasks.datain;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+ public Participant(String zkAddress, String clusterName, String participantName,
+ String taskTypeName,
+ String apiServerUrl) {
+ super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+ }
+
+ @Override
+ public Map<String, TaskFactory> getTaskFactory() {
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+ TaskFactory dataInTask = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new DataInputTask(context);
+ }
+ };
+
+ taskRegistry.put(DataInputTask.NAME, dataInTask);
+
+ return taskRegistry;
+ }
+
+ @Override
+ public TaskTypeResource getTaskType() {
+ return DataInputTask.getTaskType();
+ }
+
+ public static void main(String args[]) {
+ HelixParticipant participant = new Participant(
+ "localhost:2199",
+ "AiravataDemoCluster",
+ "data-in-p1", DataInputTask.NAME,
+ "localhost:8080");
+ new Thread(participant).start();
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
new file mode 100644
index 0000000..314e912
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
@@ -0,0 +1,128 @@
+package org.apache.airavata.helix.tasks.dataout;
+
+import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.LinkedMultiValueMap;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class DataOutputTask extends AbstractTask {
+
+ public static final String NAME = "DATA_OUTPUT";
+
+ private String sourcePath;
+ private String identifier;
+ private String computeResourceId;
+ private ComputeResource computeResource;
+
+ public DataOutputTask(TaskCallbackContext callbackContext) {
+ super(callbackContext);
+ }
+
+ @Override
+ public void init() {
+ this.sourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.SOURCE_PATH);
+ this.identifier = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.IDENTIFIER);
+ this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+ this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+ }
+
+ @Override
+ public TaskResult onRun() {
+ try {
+ publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
+
+ String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
+ System.out.println("Downloading " + sourcePath + " to " + temporaryFile + " from compute resource "
+ + computeResource.getName());
+
+ fetchComputeResourceOperation(computeResource).transferDataOut(sourcePath, temporaryFile, "SCP");
+
+ LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
+ map.add("file", new FileSystemResource(temporaryFile));
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.MULTIPART_FORM_DATA);
+
+ HttpEntity<LinkedMultiValueMap<String, Object>> requestEntity = new HttpEntity<>(map, headers);
+
+ System.out.println("Uploading data file with task id " + getTaskId() + " and identifier "
+ + identifier + " to data store");
+
+ getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + getTaskId() + "/"
+ + identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
+
+ publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
+ sendToOutPort("Out");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+ sendToOutPort("Error");
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task completed");
+
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+
+ public static final class PARAMS {
+ public static final String SOURCE_PATH = "source_path";
+ public static final String COMPUTE_RESOURCE = "compute_resource";
+ public static final String IDENTIFIER = "identifier";
+ }
+
+ public static TaskTypeResource getTaskType() {
+ TaskTypeResource taskTypeResource = new TaskTypeResource();
+ taskTypeResource.setName(NAME);
+ taskTypeResource.setTopicName("airavata-data-collect");
+ taskTypeResource.setIcon("assets/icons/copy.png");
+ taskTypeResource.getInputTypes().addAll(
+ Arrays.asList(
+ new TaskInputTypeResource()
+ .setName(PARAMS.SOURCE_PATH)
+ .setType("String")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
+ .setName(PARAMS.IDENTIFIER)
+ .setType("String"),
+ new TaskInputTypeResource()
+ .setName(PARAMS.COMPUTE_RESOURCE)
+ .setType("Long")));
+
+ taskTypeResource.getOutPorts().addAll(
+ Arrays.asList(
+ new TaskOutPortTypeResource()
+ .setName("Out")
+ .setOrder(0),
+ new TaskOutPortTypeResource()
+ .setName("Error")
+ .setOrder(1))
+ );
+
+ return taskTypeResource;
+
+ }
+}
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
new file mode 100644
index 0000000..b226511
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
@@ -0,0 +1,54 @@
+package org.apache.airavata.helix.tasks.dataout;
+
+import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Participant extends HelixParticipant {
+
+ public Participant(String zkAddress, String clusterName, String participantName,
+ String taskTypeName, String apiServerUrl) {
+ super(zkAddress, clusterName, participantName, taskTypeName, apiServerUrl);
+ }
+
+ @Override
+ public Map<String, TaskFactory> getTaskFactory() {
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+ TaskFactory dataInTask = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new DataOutputTask(context);
+ }
+ };
+
+ taskRegistry.put(DataOutputTask.NAME, dataInTask);
+
+ return taskRegistry;
+ }
+
+ @Override
+ public TaskTypeResource getTaskType() {
+ return DataOutputTask.getTaskType();
+ }
+
+ public static void main(String args[]) {
+ HelixParticipant participant = new Participant(
+ "localhost:2199",
+ "AiravataDemoCluster",
+ "data-out-p1", DataOutputTask.NAME,
+ "localhost:8080");
+ new Thread(participant).start();
+ }
+}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties b/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
index 016011d..2a7f073 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/DataStoreController.java
@@ -46,18 +46,19 @@ public class DataStoreController {
@Resource
private DataStoreService dataStoreService;
- @PostMapping("{taskId}/{expOutputId}/upload")
+ @PostMapping("{taskId}/{identifier}/upload")
public long uploadData(@RequestParam("file") MultipartFile file, @PathVariable("taskId") long taskId,
- @PathVariable("expOutputId") long expOutputId, RedirectAttributes redirectAttributes) {
+ @PathVariable("identifier") String identifier, RedirectAttributes redirectAttributes) {
- System.out.println("Received data for task id " + taskId + " and experiment output id " + expOutputId);
+ System.out.println("Received data for task id " + taskId + " and identifier " + identifier);
if (file.isEmpty()) {
throw new ServerRuntimeException("Data file is empty");
}
try {
// Get the file and save it somewhere
byte[] bytes = file.getBytes();
- return this.dataStoreService.createEntry(taskId, expOutputId, bytes);
+ return this.dataStoreService.createEntry(taskId, identifier, bytes);
+
} catch (IOException e) {
e.printStackTrace();
throw new ServerRuntimeException("Failed to store file", e);
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
index 71df7c4..6b42313 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/data/DataStoreModel.java
@@ -44,8 +44,7 @@ public class DataStoreModel {
@Basic(fetch = FetchType.LAZY)
private byte[] content;
- @ManyToOne
- private ExperimentOutputData experimentOutputData;
+ private String identifier;
@ManyToOne
private TaskModel taskModel;
@@ -68,12 +67,12 @@ public class DataStoreModel {
return this;
}
- public ExperimentOutputData getExperimentOutputData() {
- return experimentOutputData;
+ public String getIdentifier() {
+ return identifier;
}
- public DataStoreModel setExperimentOutputData(ExperimentOutputData experimentOutputData) {
- this.experimentOutputData = experimentOutputData;
+ public DataStoreModel setIdentifier(String identifier) {
+ this.identifier = identifier;
return this;
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 68abab4..f4492fe 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -105,10 +105,10 @@ public class WorkflowService {
});
parseResult.getEdgeCache().forEach(((outPort, inPort) -> {
- if (outPort.getParentOperation() != null && outPort.getNextPort().getParentOperation() != null) {
+ if (outPort.getParentTask() != null && outPort.getNextPort().getParentTask() != null) {
Optional<TaskOutPort> sourceOutPort = taskOutPortRepository
- .findByReferenceIdAndTaskModel_Id(outPort.getId(), outPort.getParentTask().getId());
- Optional<TaskModel> targetTask = taskRepository.findById(inPort.getParentTask().getId());
+ .findByReferenceIdAndTaskModel_Id(outPort.getId(), outPort.getParentTask().getTaskResource().getId());
+ Optional<TaskModel> targetTask = taskRepository.findById(inPort.getParentTask().getTaskResource().getId());
taskDAGRepository.save(new TaskDAG()
.setSourceOutPort(sourceOutPort.get())
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
index 272ab33..e775acc 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/data/DataStoreService.java
@@ -51,10 +51,10 @@ public class DataStoreService {
this.experimentOutputDataRepository = experimentOutputDataRepository;
}
- public long createEntry(long taskId, long expOutId, byte[] content) {
+ public long createEntry(long taskId, String identifier, byte[] content) {
DataStoreModel model = new DataStoreModel();
model.setTaskModel(taskRepository.findById(taskId).get())
- .setExperimentOutputData(experimentOutputDataRepository.findById(expOutId).get())
+ .setIdentifier(identifier)
.setContent(content);
return dataStoreRepository.save(model).getId();
}
@@ -64,8 +64,7 @@ public class DataStoreService {
List<DataStoreModel> dataStoreModels = this.dataStoreRepository.findByTaskModel_ParentProcess_Id(processId);
Optional.ofNullable(dataStoreModels).ifPresent(models -> models.forEach(model -> entries.add(new DataEntryResource()
.setId(model.getId())
- .setName(model.getExperimentOutputData().getName())
- .setDataType(model.getExperimentOutputData().getType().name()))));
+ .setName(model.getIdentifier()))));
return entries;
}
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index acd94c9..22b970f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -249,6 +249,10 @@ public class ToResourceUtil {
taskStatuses.forEach(taskStatus -> resource.getTaskStatus()
.add(toResource(taskStatus).get())));
+ Optional.ofNullable(taskModel.getTaskOutPorts())
+ .ifPresent(outPorts -> outPorts.forEach(outPort -> resource.getOutPorts()
+ .add(toResource(outPort).get())));
+
resource.setOrder(taskModel.getOrderIndex());
return Optional.of(resource);
} else {
@@ -442,7 +446,7 @@ public class ToResourceUtil {
resource.setId(outPort.getId());
resource.setReferenceId(outPort.getReferenceId());
resource.setName(outPort.getName());
- resource.setTaskResource(toResource(outPort.getTaskModel()).get());
+ //resource.setTaskResource(toResource(outPort.getTaskModel()).get());
return Optional.of(resource);
} else {
return Optional.empty();
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
index 6372cb1..8f502c8 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
@@ -57,6 +57,24 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.5</version>
+ </dependency>
</dependencies>
<build>
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
new file mode 100644
index 0000000..e9dead8
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
@@ -0,0 +1,162 @@
+package org.apache.airavata.k8s.gfac.core;
+
+import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
+import org.apache.airavata.k8s.api.resources.task.TaskResource;
+import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Op;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixWorkflowManager {
+
+ private static final Logger logger = LogManager.getLogger(HelixWorkflowManager.class);
+
+ private long processId;
+ private List<TaskResource> tasks;
+
+ // out port id, next task id
+ private Map<Long, Long> edgeMap;
+
+ private KafkaSender kafkaSender;
+
+ // Todo abstract out these parameters to reusable class
+ private final RestTemplate restTemplate;
+ private String apiServerUrl;
+
+ public HelixWorkflowManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
+ KafkaSender kafkaSender,
+ RestTemplate restTemplate, String apiServerUrl) {
+ this.processId = processId;
+ this.tasks = tasks;
+ this.edgeMap = edgeMap;
+ this.kafkaSender = kafkaSender;
+ this.restTemplate = restTemplate;
+ this.apiServerUrl = apiServerUrl;
+ }
+
+ public void launchWorkflow() {
+ org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
+ InstanceType.SPECTATOR, "localhost:2199");
+
+ try {
+
+ Workflow.Builder workflowBuilder = createWorkflow();
+ WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+ workflowBuilder.setWorkflowConfig(config.build());
+ if (workflowBuilder == null) {
+ throw new Exception("Failed to create a workflow for process id " + processId);
+ }
+
+ Workflow workflow = workflowBuilder.build();
+
+ helixManager.connect();
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ helixManager.disconnect();
+ }
+ }
+ );
+
+ taskDriver.start(workflow);
+ logger.info("Started workflow");
+ TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+ System.out.println("Workflow state " + taskState.name());
+
+ } catch (Exception ex) {
+ logger.error("Error in connect() for Admin, reason: " + ex, ex);
+ }
+ }
+
+ private Workflow.Builder createWorkflow() {
+ Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
+ if (startingTask.isPresent()) {
+ Workflow.Builder workflow = new Workflow.Builder("Airavata_Process_" + processId).setExpiry(0);
+ createWorkflowRecursively(startingTask.get(), workflow, null);
+ return workflow;
+ } else {
+ System.out.println("No starting task for this process " + processId);
+ updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for this process");
+ return null;
+ }
+ }
+
+ private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId) {
+
+ TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + taskResource.getId())
+ .setCommand(taskResource.getTaskType().getName());
+
+ Optional.ofNullable(taskResource.getInputs()).ifPresent(inputs -> inputs.forEach(input -> {
+ taskBuilder.addConfig(input.getName(), input.getValue());
+ }));
+
+ taskBuilder.addConfig("task_id", taskResource.getId() + "");
+ taskBuilder.addConfig("process_id", taskResource.getParentProcessId() + "");
+
+ Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
+ Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
+ Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
+ nextTaskResource.ifPresent(t -> {
+ taskBuilder.addConfig("OUT_"+ outPort.getName(), "JOB_" + t.getId());
+ });
+ });
+ }));
+
+
+ List<TaskConfig> taskBuilds = new ArrayList<>();
+ taskBuilds.add(taskBuilder.build());
+
+ JobConfig.Builder job = new JobConfig.Builder()
+ .addTaskConfigs(taskBuilds)
+ .setFailureThreshold(0)
+ .setMaxAttemptsPerTask(3)
+ .setInstanceGroupTag(taskResource.getTaskType().getName());
+
+ workflow.addJob(("JOB_" + taskResource.getId()), job);
+ if (parentTaskId != null) {
+ workflow.addParentChildDependency("JOB_" + parentTaskId, "JOB_" + taskResource.getId());
+ }
+
+ Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
+ Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
+ Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
+ nextTaskResource.ifPresent(t -> {
+
+ createWorkflowRecursively(t, workflow, taskResource.getId());
+ });
+ });
+ }));
+ }
+
+ private void updateProcessStatus(ProcessStatusResource.State state) {
+ updateProcessStatus(state, "");
+ }
+
+ private void updateProcessStatus(ProcessStatusResource.State state, String reason) {
+ this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId + "/status",
+ new ProcessStatusResource()
+ .setState(state.getValue())
+ .setReason(reason)
+ .setTimeOfStateChange(System.currentTimeMillis()),
+ Long.class);
+ }
+
+}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
index 42aa43d..6a02975 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
@@ -41,10 +41,10 @@ public class KafkaReceiver {
System.out.println("received process=" + payload);
workerService.launchProcess(Long.parseLong(payload));
}
-
- @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
- public void receiveTaskEvent(TaskContext taskContext) {
- System.out.println("received event for task id =" + taskContext.getTaskId());
- workerService.onTaskStateEvent(taskContext);
- }
+//
+// @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
+// public void receiveTaskEvent(TaskContext taskContext) {
+// System.out.println("received event for task id =" + taskContext.getTaskId());
+// workerService.onTaskStateEvent(taskContext);
+// }
}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
new file mode 100644
index 0000000..05a1c55
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
@@ -0,0 +1,17 @@
+package org.apache.airavata.k8s.gfac.service;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Service
+public class HelixWorkflowService {
+
+ public void launchProcess(long processId) {
+
+ }
+}
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 5449951..1ccf1d1 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -23,14 +23,19 @@ import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
+import org.apache.airavata.k8s.gfac.core.HelixWorkflowManager;
import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager;
import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
import org.apache.airavata.k8s.task.api.TaskContext;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* TODO: Class level comments please
@@ -44,6 +49,7 @@ public class WorkerService {
private final RestTemplate restTemplate;
private final KafkaSender kafkaSender;
private Map<Long, ProcessLifeCycleManager> processLifecycleStore = new HashMap<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
@Value("${api.server.url}")
private String apiServerUrl;
@@ -59,8 +65,9 @@ public class WorkerService {
ProcessResource.class);
List<TaskResource> taskResources = processResource.getTasks();
- Set<TaskDagResource> takDagSet = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/dag/"
- + processId, Set.class);
+ Set<TaskDagResource> takDagSet = this.restTemplate.exchange("http://" + apiServerUrl + "/task/dag/"
+ + processId, HttpMethod.GET, null, new ParameterizedTypeReference<Set<TaskDagResource>>() {})
+ .getBody();
final Map<Long, Long> edgeMap = new HashMap<>();
Optional.ofNullable(takDagSet)
@@ -68,12 +75,22 @@ public class WorkerService {
edgeMap.put(dag.getSourceOutPort().getId(), dag.getTargetTask().getId())));
System.out.println("Starting to execute process " + processId);
- ProcessLifeCycleManager manager =
- new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+ //ProcessLifeCycleManager manager =
+ // new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
- manager.init();
- manager.start();
- processLifecycleStore.put(processId, manager);
+ //manager.init();
+ //manager.start();
+
+ //processLifecycleStore.put(processId, manager);
+
+ final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl);
+
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ helixWorkflowManager.launchWorkflow();
+ }
+ });
}
public void onTaskStateEvent(TaskContext taskContext) {
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties
new file mode 100644
index 0000000..0a185ad
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/pom.xml b/airavata-kubernetes/pom.xml
index c758f70..222fffe 100644
--- a/airavata-kubernetes/pom.xml
+++ b/airavata-kubernetes/pom.xml
@@ -40,6 +40,7 @@
<module>modules/microservices/tasks/data-pushing-task</module>
<module>modules/microservices/tasks/data-collecting-task</module>
<module>modules/task-api</module>
+ <module>modules/helix-tasks</module>
</modules>
<dependencyManagement>
--
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.