You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:09:58 UTC
[airavata] 01/17: Initial helix migration
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit c9a1b064bb71dd936bb7f480bf4d7703ba7ad9a0
Author: dimuthu <di...@gmail.com>
AuthorDate: Wed Feb 21 13:31:29 2018 -0500
Initial helix migration
---
modules/airavata-helix/agent-api/pom.xml | 47 +++
.../apache/airavata/agents/api/AdaptorParams.java | 26 ++
.../java/org/apache/airavata/agents/api/Agent.java | 10 +
.../apache/airavata/agents/api/AgentAdaptor.java | 23 ++
.../apache/airavata/agents/api/AgentException.java | 30 ++
.../org/apache/airavata/agents/api/AgentStore.java | 103 +++++
.../apache/airavata/agents/api/CommandOutput.java | 16 +
.../airavata/agents/api/JobSubmissionOutput.java | 74 ++++
modules/airavata-helix/agent-impl/pom.xml | 27 ++
.../airavata-helix/agent-impl/ssh-agent/pom.xml | 76 ++++
.../helix/agent/local/LocalAgentAdaptor.java | 43 +++
.../airavata/helix/agent/ssh/SshAdaptorParams.java | 116 ++++++
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 430 +++++++++++++++++++++
.../helix/agent/ssh/StandardOutReader.java | 83 ++++
modules/airavata-helix/pom.xml | 24 ++
modules/airavata-helix/task-api/pom.xml | 41 ++
.../apache/airavata/helix/task/api/TaskHelper.java | 13 +
.../helix/task/api/annotation/TaskDef.java | 18 +
.../helix/task/api/annotation/TaskOutPort.java | 18 +
.../helix/task/api/annotation/TaskParam.java | 20 +
.../helix/task/api/support/AdaptorSupport.java | 52 +++
.../src/main/resources/application.properties | 3 +
.../task-api/src/main/resources/log4j.properties | 9 +
modules/airavata-helix/task-core/pom.xml | 47 +++
.../apache/airavata/helix/core/AbstractTask.java | 108 ++++++
.../org/apache/airavata/helix/core/OutPort.java | 44 +++
.../helix/core/controller/HelixController.java | 91 +++++
.../helix/core/participant/HelixParticipant.java | 171 ++++++++
.../helix/core/support/AdaptorSupportImpl.java | 47 +++
.../helix/core/support/TaskHelperImpl.java | 16 +
.../airavata/helix/core/util/PropertyResolver.java | 44 +++
.../apache/airavata/helix/core/util/TaskUtil.java | 103 +++++
modules/airavata-helix/workflow-impl/pom.xml | 44 +++
.../airavata/helix/workflow/SimpleWorkflow.java | 40 ++
.../airavata/helix/workflow/WorkflowManager.java | 94 +++++
modules/helix-spectator/pom.xml | 50 +++
.../helix/impl/participant/GlobalParticipant.java | 68 ++++
.../airavata/helix/impl/task/AiravataTask.java | 293 ++++++++++++++
.../airavata/helix/impl/task/DataStagingTask.java | 19 +
.../airavata/helix/impl/task/EnvSetupTask.java | 64 +++
.../helix/impl/task/submission/GroovyMapData.java | 415 ++++++++++++++++++++
.../helix/impl/task/submission/Script.java | 43 +++
.../helix/impl/task/submission/ScriptTag.java | 13 +
.../helix/impl/task/submission/SubmissionUtil.java | 10 +
.../impl/task/submission/config/JobFactory.java | 102 +++++
.../submission/config/JobManagerConfiguration.java | 29 ++
.../impl/task/submission/config/OutputParser.java | 41 ++
.../task/submission/config/RawCommandInfo.java | 22 ++
.../config/imp/ForkJobConfiguration.java | 113 ++++++
.../impl/task/submission/config/imp/JobUtil.java | 58 +++
.../submission/config/imp/LSFJobConfiguration.java | 120 ++++++
.../submission/config/imp/PBSJobConfiguration.java | 122 ++++++
.../config/imp/SlurmJobConfiguration.java | 117 ++++++
.../submission/config/imp/UGEJobConfiguration.java | 117 ++++++
.../parser/AiravataCustomCommandOutputParser.java | 56 +++
.../config/imp/parser/ForkOutputParser.java | 58 +++
.../config/imp/parser/LSFOutputParser.java | 132 +++++++
.../config/imp/parser/PBSOutputParser.java | 142 +++++++
.../config/imp/parser/SlurmOutputParser.java | 137 +++++++
.../config/imp/parser/UGEOutputParser.java | 108 ++++++
.../submission/task/DefaultJobSubmissionTask.java | 232 +++++++++++
.../submission/task/ForkJobSubmissionTask.java | 79 ++++
.../task/submission/task/JobSubmissionTask.java | 202 ++++++++++
.../submission/task/LocalJobSubmissionTask.java | 81 ++++
.../helix/impl/workflow/SimpleWorkflow.java | 31 ++
.../src/main/resources/airavata-server.properties | 334 ++++++++++++++++
.../src/main/resources/application.properties | 3 +
.../src/main/resources/log4j.properties | 11 +
pom.xml | 2 +
69 files changed, 5575 insertions(+)
diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-api/pom.xml
new file mode 100644
index 0000000..02ee48a
--- /dev/null
+++ b/modules/airavata-helix/agent-api/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>agent-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-cpi</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <!--<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java
new file mode 100644
index 0000000..ca6f80a
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java
@@ -0,0 +1,26 @@
+package org.apache.airavata.agents.api;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AdaptorParams {
+
+ public Object loadFromFile(File file) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(file, this.getClass());
+ }
+
+ public void writeToFile(File file) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(file, this);
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java
new file mode 100644
index 0000000..f48aa3e
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java
@@ -0,0 +1,10 @@
+package org.apache.airavata.agents.api;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Agent {
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
new file mode 100644
index 0000000..2d295de
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -0,0 +1,23 @@
+package org.apache.airavata.agents.api;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface AgentAdaptor {
+
+ public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException;
+
+ public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
+
+ public void createDirectory(String path) throws AgentException;
+
+ public void copyFile(String sourceFile, String destinationFile) throws AgentException;
+
+ public List<String> listDirectory(String path) throws AgentException;
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java
new file mode 100644
index 0000000..9dfe50e
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.agents.api;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AgentException extends Exception {
+
+ public AgentException() {
+ super();
+ }
+
+ public AgentException(String message) {
+ super(message);
+ }
+
+ public AgentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AgentException(Throwable cause) {
+ super(cause);
+ }
+
+ protected AgentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java
new file mode 100644
index 0000000..78f2276
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java
@@ -0,0 +1,103 @@
+package org.apache.airavata.agents.api;
+
+import java.io.*;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AgentStore {
+ public AgentAdaptor fetchAdaptor(String computeResource, String Protocol, String authToken) throws AgentException {
+
+ AgentData agentData = getAgentDataForComputeResource(computeResource);
+
+ try {
+ URL[] urls = new URL[1];
+ urls[0] = new URL(agentData.getLibraryLocation());
+ URLClassLoader classLoader = new URLClassLoader(urls, AgentAdaptor.class.getClassLoader());
+
+ Class<?> clazz = classLoader.loadClass(agentData.getAdaptorClass());
+ AgentAdaptor agentAdaptor = (AgentAdaptor) clazz.newInstance();
+
+ Class<?> paramClazz = classLoader.loadClass(agentData.paramClass);
+ AdaptorParams adaptorParams = (AdaptorParams) paramClazz.newInstance();
+
+ Object paramsInit = adaptorParams.loadFromFile(new File(agentData.paramDataFile));
+
+ //agentAdaptor.init(paramsInit);
+ System.out.println("Done");
+
+ return agentAdaptor;
+
+ } catch (IllegalAccessException | ClassNotFoundException | InstantiationException | IOException e) {
+ e.printStackTrace();
+ throw new AgentException("Failed to fetch agent adaptor for compute resource " + computeResource, e);
+ }
+ }
+
+ public static void main(String args[]) throws InstantiationException, IOException, AgentException {
+ AgentStore store = new AgentStore();
+
+ AgentAdaptor agentAdaptor = store.fetchAdaptor("localhost", null, null);
+ System.out.println("Agent loaded");
+ }
+
+ private AgentData getAgentDataForComputeResource(String computeResource) {
+ if ("localhost".equals(computeResource)) {
+ return new AgentData().setLibraryLocation("file:///Users/dimuthu/code/fork/airavata-sandbox/airavata-helix/modules/agent-impl/ssh-agent/target/ssh-agent-1.0-SNAPSHOT-jar-with-dependencies.jar")
+ .setAdaptorClass("org.apache.airavata.helix.agent.ssh.SshAgentAdaptor")
+ .setParamClass("org.apache.airavata.helix.agent.ssh.SshAdaptorParams")
+ .setParamDataFile("/tmp/ssh-param.json");
+ }
+
+ return null;
+ }
+
+ public static class AgentData {
+
+ private String libraryLocation;
+ private String adaptorClass;
+ private String paramClass;
+ private String paramDataFile;
+
+ public String getLibraryLocation() {
+ return libraryLocation;
+ }
+
+ public AgentData setLibraryLocation(String libraryLocation) {
+ this.libraryLocation = libraryLocation;
+ return this;
+ }
+
+ public String getAdaptorClass() {
+ return adaptorClass;
+ }
+
+ public AgentData setAdaptorClass(String adaptorClass) {
+ this.adaptorClass = adaptorClass;
+ return this;
+ }
+
+ public String getParamClass() {
+ return paramClass;
+ }
+
+ public AgentData setParamClass(String paramClass) {
+ this.paramClass = paramClass;
+ return this;
+ }
+
+ public String getParamDataFile() {
+ return paramDataFile;
+ }
+
+ public AgentData setParamDataFile(String paramDataFile) {
+ this.paramDataFile = paramDataFile;
+ return this;
+ }
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java
new file mode 100644
index 0000000..94a0118
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java
@@ -0,0 +1,16 @@
+package org.apache.airavata.agents.api;
+
+import java.io.OutputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface CommandOutput {
+
+ String getStdOut();
+ String getStdError();
+ Integer getExitCode();
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
new file mode 100644
index 0000000..1858826
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
@@ -0,0 +1,74 @@
+package org.apache.airavata.agents.api;
+
+public class JobSubmissionOutput {
+ private int exitCode = Integer.MIN_VALUE;
+ private String stdOut;
+ private String stdErr;
+ private String command;
+ private String jobId;
+ private boolean isJobSubmissionFailed;
+ private String failureReason;
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public JobSubmissionOutput setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ return this;
+ }
+
+ public String getStdOut() {
+ return stdOut;
+ }
+
+ public JobSubmissionOutput setStdOut(String stdOut) {
+ this.stdOut = stdOut;
+ return this;
+ }
+
+ public String getStdErr() {
+ return stdErr;
+ }
+
+ public JobSubmissionOutput setStdErr(String stdErr) {
+ this.stdErr = stdErr;
+ return this;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public JobSubmissionOutput setCommand(String command) {
+ this.command = command;
+ return this;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public JobSubmissionOutput setJobId(String jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public boolean isJobSubmissionFailed() {
+ return isJobSubmissionFailed;
+ }
+
+ public JobSubmissionOutput setJobSubmissionFailed(boolean jobSubmissionFailed) {
+ isJobSubmissionFailed = jobSubmissionFailed;
+ return this;
+ }
+
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public JobSubmissionOutput setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ return this;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/pom.xml b/modules/airavata-helix/agent-impl/pom.xml
new file mode 100644
index 0000000..57f0c08
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>agent-impl</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>ssh-agent</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
new file mode 100644
index 0000000..44cf919
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>agent-impl</artifactId>
+ <groupId>org.apache</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ssh-agent</artifactId>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.53</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-cpi</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <!-- get all project dependencies -->
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <!-- MainClass in mainfest make a executable jar -->
+
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- bind to the packaging phase -->
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
new file mode 100644
index 0000000..af507bf
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.helix.agent.local;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+
+import java.io.File;
+import java.util.List;
+
+public class LocalAgentAdaptor implements AgentAdaptor {
+
+
+
+ public void init(Object agentPams) throws AgentException {
+
+ }
+
+ @Override
+ public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
+
+ }
+
+ @Override
+ public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+ return null;
+ }
+
+ @Override
+ public void createDirectory(String path) throws AgentException {
+
+ }
+
+ @Override
+ public void copyFile(String sourceFile, String destinationFile) throws AgentException {
+
+ }
+
+ @Override
+ public List<String> listDirectory(String path) throws AgentException {
+ return null;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java
new file mode 100644
index 0000000..f54ae60
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java
@@ -0,0 +1,116 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import org.apache.airavata.agents.api.AdaptorParams;
+
+import java.io.*;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SshAdaptorParams extends AdaptorParams implements Serializable {
+
+ private int port = 22;
+ private String hostName;
+ private String userName;
+
+ private String password;
+
+ private byte[] publicKey;
+ private byte[] privateKey;
+ private String passphrase;
+
+ private String knownHostsFilePath;
+ private boolean strictHostKeyChecking;
+
+ public int getPort() {
+ return port;
+ }
+
+ public SshAdaptorParams setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public SshAdaptorParams setHostName(String hostName) {
+ this.hostName = hostName;
+ return this;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public SshAdaptorParams setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public SshAdaptorParams setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public byte[] getPublicKey() {
+ return publicKey;
+ }
+
+ public SshAdaptorParams setPublicKey(byte[] publicKey) {
+ this.publicKey = publicKey;
+ return this;
+ }
+
+ public byte[] getPrivateKey() {
+ return privateKey;
+ }
+
+ public SshAdaptorParams setPrivateKey(byte[] privateKey) {
+ this.privateKey = privateKey;
+ return this;
+ }
+
+ public String getPassphrase() {
+ return passphrase;
+ }
+
+ public SshAdaptorParams setPassphrase(String passphrase) {
+ this.passphrase = passphrase;
+ return this;
+ }
+
+ public String getKnownHostsFilePath() {
+ return knownHostsFilePath;
+ }
+
+ public SshAdaptorParams setKnownHostsFilePath(String knownHostsFilePath) {
+ this.knownHostsFilePath = knownHostsFilePath;
+ return this;
+ }
+
+ public boolean isStrictHostKeyChecking() {
+ return strictHostKeyChecking;
+ }
+
+ public SshAdaptorParams setStrictHostKeyChecking(boolean strictHostKeyChecking) {
+ this.strictHostKeyChecking = strictHostKeyChecking;
+ return this;
+ }
+
+ public static void main(String args[]) throws IOException {
+ SshAdaptorParams params = new SshAdaptorParams();
+ params.setUserName("dimuthu");
+ params.setPassword("upe");
+ params.setHostName("localhost");
+ params.writeToFile(new File("/tmp/ssh-param.json"));
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
new file mode 100644
index 0000000..19b429c
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -0,0 +1,430 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import com.jcraft.jsch.*;
+import org.apache.airavata.agents.api.*;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SshAgentAdaptor implements AgentAdaptor {
+
+ private Session session = null;
+ private AppCatalog appCatalog;
+ private ComputeResourceDescription computeResourceDescription;
+ private ResourceJobManager resourceJobManager;
+ private SSHJobSubmission sshJobSubmission;
+
+ public void init(AdaptorParams adaptorParams) throws AgentException {
+
+ if (adaptorParams instanceof SshAdaptorParams) {
+ SshAdaptorParams params = SshAdaptorParams.class.cast(adaptorParams);
+ JSch jSch = new JSch();
+ try {
+
+ if (params.getPassword() != null) {
+ this.session = jSch.getSession(params.getUserName(), params.getHostName(), params.getPort());
+ session.setPassword(params.getPassword());
+ session.setUserInfo(new SftpUserInfo(params.getPassword()));
+ } else {
+ jSch.addIdentity(UUID.randomUUID().toString(), params.getPrivateKey(), params.getPublicKey(),
+ params.getPassphrase().getBytes());
+ this.session = jSch.getSession(params.getUserName(), params.getHostName(),
+ params.getPort());
+ session.setUserInfo(new DefaultUserInfo(params.getUserName(), null, params.getPassphrase()));
+ }
+
+ if (params.isStrictHostKeyChecking()) {
+ jSch.setKnownHosts(params.getKnownHostsFilePath());
+ } else {
+ session.setConfig("StrictHostKeyChecking", "no");
+ }
+ session.connect(); // 0 connection timeout
+
+ } catch (JSchException e) {
+ throw new AgentException("Could not create ssh session for host " + params.getHostName(), e);
+ }
+ } else {
+ throw new AgentException("Unknown parameter type to ssh initialize agent adaptor. Required SshAdaptorParams type");
+ }
+
+ }
+
+ @Override
+ public void init(String computeResourceId, String gatewayId, String userId, String token) throws AgentException {
+ try {
+ this.appCatalog = RegistryFactory.getAppCatalog();
+ this.computeResourceDescription = this.appCatalog.getComputeResource().getComputeResource(computeResourceId);
+ List<JobSubmissionInterface> jobSubmissionInterfaces = this.computeResourceDescription.getJobSubmissionInterfaces();
+ Optional<JobSubmissionInterface> jobSubmissionInterfaceOp = jobSubmissionInterfaces.stream()
+ .filter(iface -> JobSubmissionProtocol.SSH == iface.getJobSubmissionProtocol() ||
+ JobSubmissionProtocol.SSH_FORK == iface.getJobSubmissionProtocol())
+ .findFirst();
+
+ JobSubmissionInterface jobSubmissionInterface = jobSubmissionInterfaceOp.orElseThrow(() -> new AgentException("Could not find a Job submission interface with SSH"));
+
+ this.sshJobSubmission = this.appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ this.resourceJobManager = sshJobSubmission.getResourceJobManager();
+
+ String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+ String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+ String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+ String driver = ServerSettings.getCredentialStoreDBDriver();
+ CredentialReaderImpl credentialReader = new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass, driver));
+ Credential credential = credentialReader.getCredential(gatewayId, token);
+
+ if (credential instanceof SSHCredential) {
+ SSHCredential sshCredential = SSHCredential.class.cast(credential);
+ SshAdaptorParams adaptorParams = new SshAdaptorParams();
+ adaptorParams.setHostName(this.computeResourceDescription.getHostName());
+ adaptorParams.setUserName(userId);
+ adaptorParams.setPassphrase(sshCredential.getPassphrase());
+ adaptorParams.setPrivateKey(sshCredential.getPrivateKey());
+ adaptorParams.setPublicKey(sshCredential.getPublicKey());
+ adaptorParams.setStrictHostKeyChecking(false);
+ init(adaptorParams);
+ }
+
+ } catch (AppCatalogException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (CredentialStoreException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ }
+ }
+
+ public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+ StandardOutReader commandOutput = new StandardOutReader();
+ try {
+ ChannelExec channelExec = ((ChannelExec) session.openChannel("exec"));
+ channelExec.setCommand(command);
+ channelExec.setInputStream(null);
+ channelExec.setErrStream(commandOutput.getStandardError());
+ channelExec.connect();
+ commandOutput.onOutput(channelExec);
+ return commandOutput;
+ } catch (JSchException e) {
+ throw new AgentException(e);
+ }
+ }
+
+ public void createDirectory(String path) throws AgentException {
+ try {
+ String command = "mkdir -p " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+ System.out.println("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName());
+ throw new AgentException(e);
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+ throw new AgentException(stdOutReader.getStdErrorString());
+ }
+
+ channel.disconnect();
+ } catch (JSchException e) {
+ throw new AgentException(e);
+ }
+ }
+
+ public void copyFile(String localFile, String remoteFile) throws AgentException {
+ FileInputStream fis = null;
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+ boolean ptimestamp = true;
+
+ try {
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+ Channel channel = session.openChannel("exec");
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ //log.error(error);
+ throw new AgentException(error);
+ }
+
+ File _lfile = new File(localFile);
+
+ if (ptimestamp) {
+ command = "T" + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ throw new AgentException(error);
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (localFile.lastIndexOf('/') > 0) {
+ command += localFile.substring(localFile.lastIndexOf('/') + 1);
+ } else {
+ command += localFile;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ //log.error(error);
+ throw new AgentException(error);
+ }
+
+ // send a content of localFile
+ fis = new FileInputStream(localFile);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ //log.error(error);
+ throw new AgentException(error);
+ }
+ out.close();
+ stdOutReader.onOutput(channel);
+
+
+ channel.disconnect();
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new AgentException(stdOutReader.getStdErrorString());
+ }
+ //since remote file is always a file we just return the file
+ //return remoteFile;
+ } catch (JSchException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public List<String> listDirectory(String path) throws AgentException {
+
+ try {
+ String command = "ls " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+// session.disconnect();
+
+ throw new AgentException("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ stdOutReader.onOutput(channel);
+ stdOutReader.getStdOutputString();
+ if (stdOutReader.getStdErrorString().contains("ls:")) {
+ throw new AgentException(stdOutReader.getStdErrorString());
+ }
+ channel.disconnect();
+ return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+
+ } catch (JSchException e) {
+ throw new AgentException(e);
+ }
+ }
+
+ private static class DefaultUserInfo implements UserInfo, UIKeyboardInteractive {
+
+ private String userName;
+ private String password;
+ private String passphrase;
+
+ public DefaultUserInfo(String userName, String password, String passphrase) {
+ this.userName = userName;
+ this.password = password;
+ this.passphrase = passphrase;
+ }
+
+ @Override
+ public String getPassphrase() {
+ return passphrase;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ public boolean promptPassword(String s) {
+ return true;
+ }
+
+ @Override
+ public boolean promptPassphrase(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptYesNo(String s) {
+ return false;
+ }
+
+ @Override
+ public void showMessage(String s) {
+
+ }
+
+ @Override
+ public String[] promptKeyboardInteractive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
+ return new String[0];
+ }
+ }
+
+ class SftpUserInfo implements UserInfo {
+
+ String password = null;
+
+ public SftpUserInfo(String password) {
+ this.password = password;
+ }
+
+ @Override
+ public String getPassphrase() {
+ return null;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String passwd) {
+ password = passwd;
+ }
+
+ @Override
+ public boolean promptPassphrase(String message) {
+ return false;
+ }
+
+ @Override
+ public boolean promptPassword(String message) {
+ return false;
+ }
+
+ @Override
+ public boolean promptYesNo(String message) {
+ return true;
+ }
+
+ @Override
+ public void showMessage(String message) {
+ }
+ }
+
+ static int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ //FIXME: Redundant
+ if (b == 1) { // error
+ System.out.print(sb.toString());
+ }
+ if (b == 2) { // fatal error
+ System.out.print(sb.toString());
+ }
+ //log.warn(sb.toString());
+ }
+ return b;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
new file mode 100644
index 0000000..49c036e
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
@@ -0,0 +1,83 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.agents.api.CommandOutput;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class StandardOutReader implements CommandOutput {
+
+ // Todo improve this. We need to direct access of std out and exit code
+
+ String stdOutputString = null;
+ ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+ private int exitCode;
+
+ public void onOutput(Channel channel) {
+ try {
+ StringBuffer pbsOutput = new StringBuffer("");
+ InputStream inputStream = channel.getInputStream();
+ byte[] tmp = new byte[1024];
+ do {
+ while (inputStream.available() > 0) {
+ int i = inputStream.read(tmp, 0, 1024);
+ if (i < 0) break;
+ pbsOutput.append(new String(tmp, 0, i));
+ }
+ } while (!channel.isClosed()) ;
+ String output = pbsOutput.toString();
+ this.setStdOutputString(output);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void exitCode(int code) {
+ System.out.println("Program exit code - " + code);
+ this.exitCode = code;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public String getStdOutputString() {
+ return stdOutputString;
+ }
+
+ public void setStdOutputString(String stdOutputString) {
+ this.stdOutputString = stdOutputString;
+ }
+
+ public String getStdErrorString() {
+ return errorStream.toString();
+ }
+
+ public OutputStream getStandardError() {
+ return errorStream;
+ }
+
+ @Override
+ public String getStdOut() {
+ return null;
+ }
+
+ @Override
+ public String getStdError() {
+ return null;
+ }
+
+ @Override
+ public String getExitCommand() {
+ return null;
+ }
+}
diff --git a/modules/airavata-helix/pom.xml b/modules/airavata-helix/pom.xml
new file mode 100644
index 0000000..05938fd
--- /dev/null
+++ b/modules/airavata-helix/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <packaging>pom</packaging>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>airavata-helix</artifactId>
+
+ <modules>
+ <module>agent-api</module>
+ <module>agent-impl</module>
+ <module>task-api</module>
+ <module>task-core</module>
+ <module>workflow-impl</module>
+ </modules>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/pom.xml b/modules/airavata-helix/task-api/pom.xml
new file mode 100644
index 0000000..41ec00c
--- /dev/null
+++ b/modules/airavata-helix/task-api/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>task-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <!--<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>-->
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
new file mode 100644
index 0000000..07de06e
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.helix.task.api;
+
+import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface TaskHelper {
+ public AdaptorSupport getAdaptorSupport();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java
new file mode 100644
index 0000000..3e4b7f1
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java
@@ -0,0 +1,18 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskDef {
+ public String name();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java
new file mode 100644
index 0000000..a22c387
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java
@@ -0,0 +1,18 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskOutPort {
+ public String name();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java
new file mode 100644
index 0000000..198b172
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java
@@ -0,0 +1,20 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskParam {
+ public String name();
+ public String defaultValue() default "";
+ public boolean mandatory() default false;
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
new file mode 100644
index 0000000..3e24aaa
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
@@ -0,0 +1,52 @@
+package org.apache.airavata.helix.task.api.support;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+
+import java.io.File;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface AdaptorSupport {
+ public void initializeAdaptor();
+
+ public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws Exception;
+
+
+ /**
+ *
+ * @param command
+ * @param workingDirectory
+ * @param computeResourceId
+ * @param protocol
+ * @param authToken
+ * @throws Exception
+ */
+ public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws Exception;
+
+ /**
+ *
+ * @param path
+ * @param computeResourceId
+ * @param protocol
+ * @param authToken
+ * @throws Exception
+ */
+ public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws Exception;
+
+ /**
+ *
+ * @param sourceFile
+ * @param destinationFile
+ * @param computeResourceId
+ * @param protocol
+ * @param authToken
+ * @throws Exception
+ */
+ public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws Exception;
+}
diff --git a/modules/airavata-helix/task-api/src/main/resources/application.properties b/modules/airavata-helix/task-api/src/main/resources/application.properties
new file mode 100644
index 0000000..733515f
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+helix.controller.name=controller-1
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/src/main/resources/log4j.properties b/modules/airavata-helix/task-api/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
new file mode 100644
index 0000000..df72dac
--- /dev/null
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>task-core</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>task-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <!--<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
new file mode 100644
index 0000000..04fa37f
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -0,0 +1,108 @@
+package org.apache.airavata.helix.core;
+
+import org.apache.airavata.helix.core.util.TaskUtil;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+ private static final String NEXT_JOB = "next-job";
+ private static final String WORKFLOW_STARTED = "workflow-started";
+
+ @TaskParam(name = "taskId")
+ private String taskId;
+
+ private TaskCallbackContext callbackContext;
+ private TaskHelper taskHelper;
+
+ @Override
+ public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+ super.init(manager, workflowName, jobName, taskName);
+ try {
+ TaskUtil.deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+ } catch (IllegalAccessException | InstantiationException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public final TaskResult run() {
+ boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
+ this.callbackContext.getJobConfig().getJobId()
+ .equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
+ if (isThisNextJob) {
+ return onRun(this.taskHelper);
+ } else {
+ return new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
+ }
+ }
+
+ @Override
+ public final void cancel() {
+ onCancel();
+ }
+
+ public abstract TaskResult onRun(TaskHelper helper);
+
+ public abstract void onCancel();
+
+ protected void publishErrors(Throwable e) {
+ // TODO Publish through kafka channel with task and workflow id
+ e.printStackTrace();
+ }
+
+ public void sendNextJob(String jobId) {
+ putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
+ if (jobId != null) {
+ putUserContent(NEXT_JOB, jobId, Scope.WORKFLOW);
+ }
+ }
+
+ protected void setContextVariable(String key, String value) {
+ putUserContent(key, value, Scope.WORKFLOW);
+ }
+
+ protected String getContextVariable(String key) {
+ return getUserContent(key, Scope.WORKFLOW);
+ }
+
+ // Getters and setters
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public AbstractTask setTaskId(String taskId) {
+ this.taskId = taskId;
+ return this;
+ }
+
+ public TaskCallbackContext getCallbackContext() {
+ return callbackContext;
+ }
+
+ public AbstractTask setCallbackContext(TaskCallbackContext callbackContext) {
+ this.callbackContext = callbackContext;
+ return this;
+ }
+
+ public TaskHelper getTaskHelper() {
+ return taskHelper;
+ }
+
+ public AbstractTask setTaskHelper(TaskHelper taskHelper) {
+ this.taskHelper = taskHelper;
+ return this;
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java
new file mode 100644
index 0000000..99dc37c
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java
@@ -0,0 +1,44 @@
+package org.apache.airavata.helix.core;
+
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OutPort {
+
+ private String nextJobId;
+ private AbstractTask task;
+
+ public OutPort(String nextJobId, AbstractTask task) {
+ this.nextJobId = nextJobId;
+ this.task = task;
+ }
+
+ public TaskResult invoke(TaskResult taskResult) {
+ task.sendNextJob(nextJobId);
+ return taskResult;
+ }
+
+ public String getNextJobId() {
+ return nextJobId;
+ }
+
+ public OutPort setNextJobId(String nextJobId) {
+ this.nextJobId = nextJobId;
+ return this;
+ }
+
+ public AbstractTask getTask() {
+ return task;
+ }
+
+ public OutPort setTask(AbstractTask task) {
+ this.task = task;
+ return this;
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java
new file mode 100644
index 0000000..cdc27f7
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.helix.core.controller;
+
+import org.apache.airavata.helix.core.util.PropertyResolver;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixController implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(HelixController.class);
+
+ private String clusterName;
+ private String controllerName;
+ private String zkAddress;
+ private org.apache.helix.HelixManager zkHelixManager;
+
+ private CountDownLatch startLatch = new CountDownLatch(1);
+ private CountDownLatch stopLatch = new CountDownLatch(1);
+
+ public HelixController(String propertyFile) throws IOException {
+
+ PropertyResolver propertyResolver = new PropertyResolver();
+ propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
+
+ this.clusterName = propertyResolver.get("helix.cluster.name");
+ this.controllerName = propertyResolver.get("helix.controller.name");
+ this.zkAddress = propertyResolver.get("zookeeper.connection.url");
+ }
+
+ public void run() {
+ try {
+ zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+ controllerName, HelixControllerMain.STANDALONE);
+ startLatch.countDown();
+ stopLatch.await();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ public void start() {
+ new Thread(this).start();
+ try {
+ startLatch.await();
+ logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ disconnect();
+ }
+ }
+ );
+
+ } catch (InterruptedException ex) {
+ logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+ }
+ }
+
+ public void stop() {
+ stopLatch.countDown();
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+
+ public static void main(String args[]) {
+ try {
+ HelixController helixController = new HelixController("application.properties");
+ helixController.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
new file mode 100644
index 0000000..190b866
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -0,0 +1,171 @@
+package org.apache.airavata.helix.core.participant;
+
+import org.apache.airavata.helix.core.support.TaskHelperImpl;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.util.PropertyResolver;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixParticipant <T extends AbstractTask> implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(HelixParticipant.class);
+
+ private String zkAddress;
+ private String clusterName;
+ private String participantName;
+ private ZKHelixManager zkHelixManager;
+ private String taskTypeName;
+ private PropertyResolver propertyResolver;
+ private Class<T> taskClass;
+
+ public HelixParticipant(String propertyFile, Class<T> taskClass, String taskTypeName) throws IOException {
+
+ logger.info("Initializing Participant Node");
+
+ this.propertyResolver = new PropertyResolver();
+ propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
+
+ this.zkAddress = propertyResolver.get("zookeeper.connection.url");
+ this.clusterName = propertyResolver.get("helix.cluster.name");
+ this.participantName = propertyResolver.get("participant.name");
+ this.taskTypeName = taskTypeName;
+ this.taskClass = taskClass;
+
+ logger.info("Zookeper connection url " + zkAddress);
+ logger.info("Cluster name " + clusterName);
+ logger.info("Participant name " + participantName);
+ logger.info("Task type " + taskTypeName);
+ if (taskClass != null) {
+ logger.info("Task class " + taskClass.getCanonicalName());
+ }
+ }
+
+ public Map<String, TaskFactory> getTaskFactory() {
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+ TaskFactory taskFac = new TaskFactory() {
+ public Task createNewTask(TaskCallbackContext context) {
+ try {
+ return taskClass.newInstance()
+ .setCallbackContext(context)
+ .setTaskHelper(new TaskHelperImpl());
+ } catch (InstantiationException | IllegalAccessException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+ };
+
+ TaskDef taskDef = taskClass.getAnnotation(TaskDef.class);
+ taskRegistry.put(taskDef.name(), taskFac);
+
+ return taskRegistry;
+ }
+
+ public void run() {
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+ List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+ if (!nodesInCluster.contains(participantName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(participantName);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setInstanceEnabled(true);
+ if (taskTypeName != null) {
+ instanceConfig.addTag(taskTypeName);
+ }
+ zkHelixAdmin.addInstance(clusterName, instanceConfig);
+ logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+ } else {
+ if (taskTypeName != null) {
+ zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName);
+ }
+ }
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ logger.debug("Participant: " + participantName + ", shutdown hook called.");
+ disconnect();
+ }
+ }
+ );
+
+ // connect the participant manager
+ connect();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+ private void connect() {
+ try {
+ zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+ // register online-offline model
+ StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+ OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+ machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+ // register task model
+ machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+ logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+ zkHelixManager.connect();
+ logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+ Thread.currentThread().join();
+ } catch (InterruptedException ex) {
+ logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ }
+ catch (Exception ex) {
+ logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+
+ public PropertyResolver getPropertyResolver() {
+ return propertyResolver;
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
new file mode 100644
index 0000000..87a1e17
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.helix.core.support;
+
+import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+
+import java.io.File;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AdaptorSupportImpl implements AdaptorSupport {
+
+ private static AdaptorSupportImpl INSTANCE;
+
+ private final AgentStore agentStore = new AgentStore();
+
+ private AdaptorSupportImpl() {}
+
+ public synchronized static AdaptorSupportImpl getInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new AdaptorSupportImpl();
+ }
+ return INSTANCE;
+ }
+
+ public void initializeAdaptor() {
+ }
+
+ public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws AgentException {
+ return fetchAdaptor(computeResourceId, protocol, authToken).executeCommand(command, workingDirectory);
+ }
+
+ public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws AgentException {
+ fetchAdaptor(computeResourceId, protocol, authToken).createDirectory(path);
+ }
+
+ public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws AgentException {
+ fetchAdaptor(computeResourceId, protocol, authToken).copyFile(sourceFile, destinationFile);
+ }
+
+ public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws AgentException {
+ return agentStore.fetchAdaptor(computeResource, protocol, authToken);
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
new file mode 100644
index 0000000..77fc5ce
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
@@ -0,0 +1,16 @@
+package org.apache.airavata.helix.core.support;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskHelperImpl implements TaskHelper {
+
+ public AdaptorSupportImpl getAdaptorSupport() {
+ return AdaptorSupportImpl.getInstance();
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java
new file mode 100644
index 0000000..4532345
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java
@@ -0,0 +1,44 @@
+package org.apache.airavata.helix.core.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class PropertyResolver {
+ private Properties properties = new Properties();
+
+ public void loadFromFile(File propertyFile) throws IOException {
+ properties = new Properties();
+ properties.load(new FileInputStream(propertyFile));
+ }
+
+ public void loadInputStream(InputStream inputStream) throws IOException {
+ properties = new Properties();
+ properties.load(inputStream);
+ }
+
+ public String get(String key) {
+ if (properties.containsKey(key)) {
+ if (System.getenv(key.replace(".", "_")) != null) {
+ return System.getenv(key.replace(".", "_"));
+ } else {
+ return properties.getProperty(key);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ public String get(String key, String defaultValue) {
+ return Optional.ofNullable(get(key)).orElse(defaultValue);
+ }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
new file mode 100644
index 0000000..d0f1ab6
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -0,0 +1,103 @@
+package org.apache.airavata.helix.core.util;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskUtil {
+
+ public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T task) throws IllegalAccessException {
+ Field[] fields = task.getClass().getDeclaredFields();
+ List<OutPort> outPorts = new ArrayList<>();
+ for (Field field : fields) {
+ TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+ if (outPortAnnotation != null) {
+ field.setAccessible(true);
+ OutPort outPort = (OutPort) field.get(task);
+ outPorts.add(outPort);
+ }
+ }
+ return outPorts;
+ }
+
+ public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+
+ Map<String, String> result = new HashMap<>();
+ for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field classField : fields) {
+ TaskParam parm = classField.getAnnotation(TaskParam.class);
+ if (parm != null) {
+ classField.setAccessible(true);
+ result.put(parm.name(), classField.get(data).toString());
+ }
+
+ TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+ if (outPort != null) {
+ classField.setAccessible(true);
+ if (classField.get(data) != null) {
+ result.put(outPort.name(), ((OutPort) classField.get(data)).getNextJobId().toString());
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, InstantiationException {
+
+ List<Field> allFields = new ArrayList<>();
+ Class genericClass = instance.getClass();
+
+ while (AbstractTask.class.isAssignableFrom(genericClass)) {
+ Field[] declaredFields = genericClass.getDeclaredFields();
+ for (Field declaredField : declaredFields) {
+ allFields.add(declaredField);
+ }
+ genericClass = genericClass.getSuperclass();
+ }
+
+ for (Field classField : allFields) {
+ TaskParam param = classField.getAnnotation(TaskParam.class);
+ if (param != null) {
+ if (params.containsKey(param.name())) {
+ classField.setAccessible(true);
+ if (classField.getType().isAssignableFrom(String.class)) {
+ classField.set(instance, params.get(param.name()));
+ } else if (classField.getType().isAssignableFrom(Integer.class)) {
+ classField.set(instance, Integer.parseInt(params.get(param.name())));
+ } else if (classField.getType().isAssignableFrom(Long.class)) {
+ classField.set(instance, Long.parseLong(params.get(param.name())));
+ } else if (classField.getType().isAssignableFrom(Boolean.class)) {
+ classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
+ }
+ }
+ }
+ }
+
+ for (Field classField : allFields) {
+ TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+ if (outPort != null) {
+ classField.setAccessible(true);
+ if (params.containsKey(outPort.name())) {
+ classField.set(instance, new OutPort(params.get(outPort.name()), instance));
+ } else {
+ classField.set(instance, new OutPort(null, instance));
+ }
+ }
+ }
+ }
+}
diff --git a/modules/airavata-helix/workflow-impl/pom.xml b/modules/airavata-helix/workflow-impl/pom.xml
new file mode 100644
index 0000000..03f324e
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>workflow-impl</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>task-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>task-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+<!--
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java
new file mode 100644
index 0000000..d212f91
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.helix.workflow;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SimpleWorkflow {
+
+ public static void main(String[] args) throws Exception {
+ WorkflowManager wm = new WorkflowManager("AiravataDemoCluster", "WorkflowManager", "localhost:2199");
+
+ /*MkdirTask mkdirTask1 = new MkdirTask();
+ mkdirTask1.setDirName("/tmp/newdir");
+ mkdirTask1.setComputeResourceId("localhost");
+ mkdirTask1.setTaskId("task1");
+
+ MkdirTask mkdirTask2 = new MkdirTask();
+ mkdirTask2.setDirName("/tmp/newdir2");
+ mkdirTask2.setComputeResourceId("localhost");
+ mkdirTask2.setTaskId("task2");
+
+ CommandTask commandTask1 = new CommandTask();
+ commandTask1.setCommand("touch /tmp/newdir/a.txt");
+ commandTask1.setWorkingDirectory("/tmp");
+ commandTask1.setComputeResource("localhost");
+ commandTask1.setTaskId("task3");
+
+ mkdirTask1.setSuccessPort(new OutPort("task2", mkdirTask1));
+ mkdirTask2.setSuccessPort(new OutPort("task3", mkdirTask2));
+
+ List<AbstractTask> allTasks = new ArrayList<>();
+ allTasks.add(mkdirTask2);
+ allTasks.add(mkdirTask1);
+ allTasks.add(commandTask1);
+
+ wm.launchWorkflow(UUID.randomUUID().toString(), allTasks);*/
+ }
+}
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
new file mode 100644
index 0000000..ab7e3c4
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -0,0 +1,94 @@
+package org.apache.airavata.helix.workflow;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.core.util.*;
+import org.apache.airavata.helix.core.util.TaskUtil;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class WorkflowManager {
+
+ private static final String WORKFLOW_PREFIX = "Workflow_of_process_";
+ private TaskDriver taskDriver;
+
+ public WorkflowManager(String helixClusterName, String instanceName, String zkConnectionString) throws Exception {
+
+ HelixManager helixManager = HelixManagerFactory.getZKHelixManager(helixClusterName, instanceName,
+ InstanceType.SPECTATOR, zkConnectionString);
+ helixManager.connect();
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ helixManager.disconnect();
+ }
+ }
+ );
+
+ taskDriver = new TaskDriver(helixManager);
+ }
+
+ public void launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant) throws Exception {
+
+ Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_PREFIX + processId).setExpiry(0);
+
+ for (int i = 0; i < tasks.size(); i++) {
+ AbstractTask data = tasks.get(i);
+ String taskType = data.getClass().getAnnotation(TaskDef.class).name();
+ TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + data.getTaskId())
+ .setCommand(taskType);
+ Map<String, String> paramMap = org.apache.airavata.helix.core.util.TaskUtil.serializeTaskData(data);
+ paramMap.forEach(taskBuilder::addConfig);
+
+ List<TaskConfig> taskBuilds = new ArrayList<>();
+ taskBuilds.add(taskBuilder.build());
+
+ JobConfig.Builder job = new JobConfig.Builder()
+ .addTaskConfigs(taskBuilds)
+ .setFailureThreshold(0)
+ .setMaxAttemptsPerTask(3);
+
+ if (!globalParticipant) {
+ job.setInstanceGroupTag(taskType);
+ }
+
+ workflowBuilder.addJob((data.getTaskId()), job);
+
+ List<OutPort> outPorts = TaskUtil.getOutPortsOfTask(data);
+ outPorts.forEach(outPort -> {
+ if (outPort != null) {
+ workflowBuilder.addParentChildDependency(data.getTaskId(), outPort.getNextJobId());
+ }
+ });
+ }
+
+ WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+ workflowBuilder.setWorkflowConfig(config.build());
+ Workflow workflow = workflowBuilder.build();
+
+ taskDriver.start(workflow);
+
+ //TODO : Do we need to monitor workflow status? If so how do we do it in a scalable manner? For example,
+ // if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility
+
+ TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+ System.out.println("Workflow finished with state " + taskState.name());
+
+ }
+}
\ No newline at end of file
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
new file mode 100644
index 0000000..bae2785
--- /dev/null
+++ b/modules/helix-spectator/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>helix-spectator</artifactId>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>task-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-cpi</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>workflow-impl</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>1.1.7</version>
+ </dependency>
+
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
new file mode 100644
index 0000000..f0e166b
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -0,0 +1,68 @@
+package org.apache.airavata.helix.impl.participant;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.participant.HelixParticipant;
+import org.apache.airavata.helix.core.support.TaskHelperImpl;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GlobalParticipant extends HelixParticipant {
+
+ private String[] taskClasses = {
+ "org.apache.airavata.helix.impl.task.EnvSetupTask",
+ "org.apache.airavata.helix.impl.task.DataStagingTask",
+ "org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask",
+ "org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask",
+ "org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask"
+ };
+
+ public Map<String, TaskFactory> getTaskFactory() {
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+ for (String taskClass : taskClasses) {
+ TaskFactory taskFac = new TaskFactory() {
+ public Task createNewTask(TaskCallbackContext context) {
+ try {
+ return AbstractTask.class.cast(Class.forName(taskClass).newInstance())
+ .setCallbackContext(context)
+ .setTaskHelper(new TaskHelperImpl());
+ } catch (InstantiationException | IllegalAccessException e) {
+ e.printStackTrace();
+ return null;
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+ };
+
+ TaskDef taskDef = null;
+ try {
+ taskDef = Class.forName(taskClass).getAnnotation(TaskDef.class);
+ taskRegistry.put(taskDef.name(), taskFac);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ return taskRegistry;
+ }
+
+ public GlobalParticipant(String propertyFile, Class taskClass, String taskTypeName) throws IOException {
+ super(propertyFile, taskClass, taskTypeName);
+ }
+
+ public static void main(String args[]) throws IOException {
+ GlobalParticipant participant = new GlobalParticipant("application.properties", null, null);
+ Thread t = new Thread(participant);
+ t.start();
+ }
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
new file mode 100644
index 0000000..72d3e17
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -0,0 +1,293 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+public abstract class AiravataTask extends AbstractTask {
+
+ private static final Logger logger = LogManager.getLogger(AiravataTask.class);
+
+ private AppCatalog appCatalog;
+ private ExperimentCatalog experimentCatalog;
+ private Publisher statusPublisher;
+ private ProcessModel processModel;
+
+ private ComputeResourceDescription computeResourceDescription;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private UserComputeResourcePreference userComputeResourcePreference;
+ private UserResourceProfile userResourceProfile;
+ private GatewayResourceProfile gatewayResourceProfile;
+
+ @TaskParam(name = "Process Id")
+ private String processId;
+
+ @TaskParam(name = "experimentId")
+ private String experimentId;
+
+ @TaskParam(name = "gatewayId")
+ private String gatewayId;
+
+ @TaskOutPort(name = "Success Port")
+ private OutPort onSuccess;
+
+
+ protected TaskResult onSuccess(String message) {
+ String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : "";
+ logger.info(successMessage);
+ return onSuccess.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+ }
+
+ protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+ String errorMessage;
+
+ if (error == null) {
+ errorMessage = "Task " + getTaskId() + " failed due to " + reason;
+ logger.error(errorMessage);
+ } else {
+ errorMessage = "Task " + getTaskId() + " failed due to " + reason + ", " + error.getMessage();
+ logger.error(errorMessage, error);
+ }
+ return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+
+ }
+
+ @Override
+ public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+ super.init(manager, workflowName, jobName, taskName);
+ try {
+ appCatalog = RegistryFactory.getAppCatalog();
+ experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+ processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+
+ this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
+ .getComputeResourceId());
+ this.gatewayComputeResourcePreference = getAppCatalog().getGatewayProfile()
+ .getComputeResourcePreference(getGatewayId(), computeResourceDescription.getComputeResourceId());
+
+ this.userComputeResourcePreference = getAppCatalog().getUserResourceProfile()
+ .getUserComputeResourcePreference(getProcessModel().getUserName(), getGatewayId(), getProcessModel()
+ .getComputeResourceId());
+
+ this.userResourceProfile = getAppCatalog().getUserResourceProfile()
+ .getUserResourceProfile(getProcessModel().getUserName(), getGatewayId());
+
+ this.gatewayResourceProfile = getAppCatalog().getGatewayProfile().getGatewayProfile(getGatewayId());
+
+ } catch (AppCatalogException e) {
+ e.printStackTrace();
+ } catch (RegistryException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected AppCatalog getAppCatalog() {
+ return appCatalog;
+ }
+
+ protected void publishTaskState(TaskState ts) throws RegistryException {
+
+ TaskStatus taskStatus = new TaskStatus();
+ taskStatus.setState(ts);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId());
+ TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
+ getProcessId(), getExperimentId(), getGatewayId());
+ TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
+ identifier);
+ MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+ (MessageType.TASK.name()), getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ }
+
+
+ ///////////////////
+
+ public String getComputeResourceId() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getComputeResourceId())) {
+ return userComputeResourcePreference.getComputeResourceId();
+ } else {
+ return gatewayComputeResourcePreference.getComputeResourceId();
+ }
+ }
+
+ public String getComputeResourceCredentialToken(){
+ if (isUseUserCRPref()) {
+ if (userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return userResourceProfile.getCredentialStoreToken();
+ }
+ } else {
+ if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+ }
+
+ public String getComputeResourceLoginUserName(){
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getLoginUserName())) {
+ return userComputeResourcePreference.getLoginUserName();
+ } else if (isValid(getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName())) {
+ return getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName();
+ } else {
+ return gatewayComputeResourcePreference.getLoginUserName();
+ }
+ }
+
+ public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException {
+ try {
+ JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol();
+ ComputeResourceDescription resourceDescription = getComputeResourceDescription();
+ List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+ Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+ List<JobSubmissionInterface> interfaces = new ArrayList<>();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+ for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+ if (preferredJobSubmissionProtocol != null){
+ if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+ if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+ List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+ interfaceList.add(submissionInterface);
+ }else {
+ interfaces.add(submissionInterface);
+ orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+ }
+ }
+ }else {
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ }
+ }
+ interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+ Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ } else {
+ throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+ }
+ return interfaces.get(0);
+ } catch (AppCatalogException e) {
+ throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+ }
+ }
+
+ //////////////////////////
+
+
+ protected boolean isValid(String str) {
+ return str != null && !str.trim().isEmpty();
+ }
+
+ public boolean isUseUserCRPref() {
+ return getProcessModel().isUseUserCRPref();
+ }
+
+ public JobSubmissionProtocol getJobSubmissionProtocol() {
+ return getGatewayComputeResourcePreference().getPreferredJobSubmissionProtocol();
+ }
+
+ public ComputeResourcePreference getGatewayComputeResourcePreference() {
+ return gatewayComputeResourcePreference;
+ }
+
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ ////////////////////////
+
+
+ public void setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ }
+
+ public ExperimentCatalog getExperimentCatalog() {
+ return experimentCatalog;
+ }
+
+ public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ }
+
+ public Publisher getStatusPublisher() {
+ return statusPublisher;
+ }
+
+ public void setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ }
+
+ public String getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(String processId) {
+ this.processId = processId;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public void setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ }
+
+ public ProcessModel getProcessModel() {
+ return processModel;
+ }
+
+ public void setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
new file mode 100644
index 0000000..346aa73
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
@@ -0,0 +1,19 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Data Staging Task")
+public class DataStagingTask extends AiravataTask {
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper) {
+ return null;
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
new file mode 100644
index 0000000..1cab0e2
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -0,0 +1,64 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Environment Setup Task")
+public class EnvSetupTask extends AiravataTask {
+
+ @TaskParam(name = "Working Directory")
+ private String workingDirectory;
+
+ @TaskOutPort(name = "Success Out Port")
+ private OutPort successPort;
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper) {
+ try {
+ publishTaskState(TaskState.EXECUTING);
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+ getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+ adaptor.createDirectory(workingDirectory);
+ publishTaskState(TaskState.COMPLETED);
+ return successPort.invoke(new TaskResult(TaskResult.Status.COMPLETED, "Successfully completed"));
+ } catch (Exception e) {
+ try {
+ publishTaskState(TaskState.FAILED);
+ } catch (RegistryException e1) {
+ publishErrors(e1);
+ // ignore silently
+ }
+ publishErrors(e);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed the task");
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+
+ public String getWorkingDirectory() {
+ return workingDirectory;
+ }
+
+ public void setWorkingDirectory(String workingDirectory) {
+ this.workingDirectory = workingDirectory;
+ }
+
+ public OutPort getSuccessPort() {
+ return successPort;
+ }
+
+ public void setSuccessPort(OutPort successPort) {
+ this.successPort = successPort;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
new file mode 100644
index 0000000..ec75fb7
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
@@ -0,0 +1,415 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GroovyMapData {
+
+ @ScriptTag(name = "inputDir")
+ private String inputDir;
+
+ @ScriptTag(name = "outputDir")
+ private String outputDir;
+
+ @ScriptTag(name = "executablePath")
+ private String executablePath;
+
+ @ScriptTag(name = "standardOutFile")
+ private String stdoutFile;
+
+ @ScriptTag(name = "standardErrorFile")
+ private String stderrFile;
+
+ @ScriptTag(name = "scratchLocation")
+ private String scratchLocation;
+
+ @ScriptTag(name = "gatewayId")
+ private String gatewayId;
+
+ @ScriptTag(name = "gatewayUserName")
+ private String gatewayUserName;
+
+ @ScriptTag(name = "applicationName")
+ private String applicationName;
+
+ @ScriptTag(name = "queueSpecificMacros")
+ private String queueSpecificMacros;
+
+ @ScriptTag(name = "accountString")
+ private String accountString;
+
+ @ScriptTag(name = "reservation")
+ private String reservation;
+
+ @ScriptTag(name = "jobName")
+ private String jobName;
+
+ @ScriptTag(name = "workingDirectory")
+ private String workingDirectory;
+
+ @ScriptTag(name = "inputs")
+ private List<String> inputs;
+
+ @ScriptTag(name = "inputsAll")
+ private List<String> inputsAll;
+
+ @ScriptTag(name = "userName")
+ private String userName;
+
+ @ScriptTag(name = "shellName")
+ private String shellName;
+
+ @ScriptTag(name = "maxWallTime")
+ private String maxWallTime;
+
+ @ScriptTag(name = "qualityOfService")
+ private String qualityOfService;
+
+ @ScriptTag(name = "queueName")
+ private String queueName;
+
+ @ScriptTag(name = "nodes")
+ private Integer nodes;
+
+ @ScriptTag(name = "processPerNode")
+ private Integer processPerNode;
+
+ @ScriptTag(name = "cpuCount")
+ private Integer cpuCount;
+
+ @ScriptTag(name = "usedMem")
+ private Integer usedMem;
+
+ @ScriptTag(name = "mailAddress")
+ private String mailAddress;
+
+ @ScriptTag(name = "exports")
+ private List<String> exports;
+
+ @ScriptTag(name = "moduleCommands")
+ private List<String> moduleCommands;
+
+ @ScriptTag(name = "preJobCommands")
+ private List<String> preJobCommands;
+
+ @ScriptTag(name = "postJobCommands")
+ private List<String> postJobCommands;
+
+ @ScriptTag(name = "jobSubmitterCommand")
+ private String jobSubmitterCommand;
+
+ @ScriptTag(name = "chassisName")
+ private String chassisName;
+
+
+ public Map<String, Object> getMap() {
+
+ Map<String, Object> map = new HashMap<>();
+ Field[] fields = this.getClass().getDeclaredFields();
+
+ for (Field field : fields) {
+ ScriptTag scriptTag = field.getAnnotation(ScriptTag.class);
+ if (scriptTag != null) {
+ field.setAccessible(true);
+ try {
+ map.put(scriptTag.name(), field.get(this));
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ // ignore silently
+ }
+ }
+ }
+
+ return map;
+ }
+
+ public String getInputDir() {
+ return inputDir;
+ }
+
+ public GroovyMapData setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ return this;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ public GroovyMapData setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ return this;
+ }
+
+ public String getExecutablePath() {
+ return executablePath;
+ }
+
+ public GroovyMapData setExecutablePath(String executablePath) {
+ this.executablePath = executablePath;
+ return this;
+ }
+
+ public String getStdoutFile() {
+ return stdoutFile;
+ }
+
+ public GroovyMapData setStdoutFile(String stdoutFile) {
+ this.stdoutFile = stdoutFile;
+ return this;
+ }
+
+ public String getStderrFile() {
+ return stderrFile;
+ }
+
+ public GroovyMapData setStderrFile(String stderrFile) {
+ this.stderrFile = stderrFile;
+ return this;
+ }
+
+ public String getScratchLocation() {
+ return scratchLocation;
+ }
+
+ public GroovyMapData setScratchLocation(String scratchLocation) {
+ this.scratchLocation = scratchLocation;
+ return this;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public GroovyMapData setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ return this;
+ }
+
+ public String getGatewayUserName() {
+ return gatewayUserName;
+ }
+
+ public GroovyMapData setGatewayUserName(String gatewayUserName) {
+ this.gatewayUserName = gatewayUserName;
+ return this;
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public GroovyMapData setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ return this;
+ }
+
+ public String getQueueSpecificMacros() {
+ return queueSpecificMacros;
+ }
+
+ public GroovyMapData setQueueSpecificMacros(String queueSpecificMacros) {
+ this.queueSpecificMacros = queueSpecificMacros;
+ return this;
+ }
+
+ public String getAccountString() {
+ return accountString;
+ }
+
+ public GroovyMapData setAccountString(String accountString) {
+ this.accountString = accountString;
+ return this;
+ }
+
+ public String getReservation() {
+ return reservation;
+ }
+
+ public GroovyMapData setReservation(String reservation) {
+ this.reservation = reservation;
+ return this;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public GroovyMapData setJobName(String jobName) {
+ this.jobName = jobName;
+ return this;
+ }
+
+ public String getWorkingDirectory() {
+ return workingDirectory;
+ }
+
+ public GroovyMapData setWorkingDirectory(String workingDirectory) {
+ this.workingDirectory = workingDirectory;
+ return this;
+ }
+
+ public List<String> getInputs() {
+ return inputs;
+ }
+
+ public GroovyMapData setInputs(List<String> inputs) {
+ this.inputs = inputs;
+ return this;
+ }
+
+ public List<String> getInputsAll() {
+ return inputsAll;
+ }
+
+ public GroovyMapData setInputsAll(List<String> inputsAll) {
+ this.inputsAll = inputsAll;
+ return this;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public GroovyMapData setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public String getShellName() {
+ return shellName;
+ }
+
+ public GroovyMapData setShellName(String shellName) {
+ this.shellName = shellName;
+ return this;
+ }
+
+ public String getMaxWallTime() {
+ return maxWallTime;
+ }
+
+ public GroovyMapData setMaxWallTime(String maxWallTime) {
+ this.maxWallTime = maxWallTime;
+ return this;
+ }
+
+ public String getQualityOfService() {
+ return qualityOfService;
+ }
+
+ public GroovyMapData setQualityOfService(String qualityOfService) {
+ this.qualityOfService = qualityOfService;
+ return this;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public GroovyMapData setQueueName(String queueName) {
+ this.queueName = queueName;
+ return this;
+ }
+
+ public Integer getNodes() {
+ return nodes;
+ }
+
+ public GroovyMapData setNodes(Integer nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ public Integer getProcessPerNode() {
+ return processPerNode;
+ }
+
+ public GroovyMapData setProcessPerNode(Integer processPerNode) {
+ this.processPerNode = processPerNode;
+ return this;
+ }
+
+ public Integer getCpuCount() {
+ return cpuCount;
+ }
+
+ public GroovyMapData setCpuCount(Integer cpuCount) {
+ this.cpuCount = cpuCount;
+ return this;
+ }
+
+ public Integer getUsedMem() {
+ return usedMem;
+ }
+
+ public GroovyMapData setUsedMem(Integer usedMem) {
+ this.usedMem = usedMem;
+ return this;
+ }
+
+ public String getMailAddress() {
+ return mailAddress;
+ }
+
+ public GroovyMapData setMailAddress(String mailAddress) {
+ this.mailAddress = mailAddress;
+ return this;
+ }
+
+ public List<String> getExports() {
+ return exports;
+ }
+
+ public GroovyMapData setExports(List<String> exports) {
+ this.exports = exports;
+ return this;
+ }
+
+ public List<String> getModuleCommands() {
+ return moduleCommands;
+ }
+
+ public GroovyMapData setModuleCommands(List<String> moduleCommands) {
+ this.moduleCommands = moduleCommands;
+ return this;
+ }
+
+ public List<String> getPreJobCommands() {
+ return preJobCommands;
+ }
+
+ public GroovyMapData setPreJobCommands(List<String> preJobCommands) {
+ this.preJobCommands = preJobCommands;
+ return this;
+ }
+
+ public List<String> getPostJobCommands() {
+ return postJobCommands;
+ }
+
+ public GroovyMapData setPostJobCommands(List<String> postJobCommands) {
+ this.postJobCommands = postJobCommands;
+ return this;
+ }
+
+ public String getJobSubmitterCommand() {
+ return jobSubmitterCommand;
+ }
+
+ public GroovyMapData setJobSubmitterCommand(String jobSubmitterCommand) {
+ this.jobSubmitterCommand = jobSubmitterCommand;
+ return this;
+ }
+
+ public String getChassisName() {
+ return chassisName;
+ }
+
+ public GroovyMapData setChassisName(String chassisName) {
+ this.chassisName = chassisName;
+ return this;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java
new file mode 100644
index 0000000..208e9e5
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+public enum Script {
+
+ SHELL_NAME("shellName"),
+ QUEUE_NAME("queueName"),
+ NODES("nodes"),
+ CPU_COUNT("cpuCount"),
+ MAIL_ADDRESS("mailAddress"),
+ ACCOUNT_STRING("accountString"),
+ MAX_WALL_TIME("maxWallTime"),
+ JOB_NAME("jobName"),
+ STANDARD_OUT_FILE("standardOutFile"),
+ STANDARD_ERROR_FILE("standardErrorFile"),
+ QUALITY_OF_SERVICE("qualityOfService"),
+ RESERVATION("reservation"),
+ EXPORTS("exports"),
+ MODULE_COMMANDS("moduleCommands"),
+ SCRATCH_LOCATION("scratchLocation"),
+ WORKING_DIR("workingDirectory"),
+ PRE_JOB_COMMANDS("preJobCommands"),
+ JOB_SUBMITTER_COMMAND("jobSubmitterCommand"),
+ EXECUTABLE_PATH("executablePath"),
+ INPUTS("inputs"),
+ INPUTS_ALL("inputsAll"),
+ POST_JOB_COMMANDS("postJobCommands"),
+ USED_MEM("usedMem"),
+ PROCESS_PER_NODE("processPerNode"),
+ CHASSIS_NAME("chassisName"),
+ INPUT_DIR("inputDir"),
+ OUTPUT_DIR("outputDir"),
+ USER_NAME("userName"),
+ GATEWAY_ID("gatewayId"),
+ GATEWAY_USER_NAME("gatewayUserName"),
+ APPLICATION_NAME("applicationName"),
+ QUEUE_SPECIFIC_MACROS("queueSpecificMacros")
+ ;
+
+ String name;
+ Script(String name) {
+ this.name = name;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java
new file mode 100644
index 0000000..c03c11f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface ScriptTag {
+ public String name();
+ public boolean mandatory() default false;
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java
new file mode 100644
index 0000000..e2cbfee
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java
@@ -0,0 +1,10 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.io.File;
+
+public class SubmissionUtil {
+
+ public static File createJobFile(GroovyMapData mapData) {
+ return null;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
new file mode 100644
index 0000000..b04ffd8
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
@@ -0,0 +1,102 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+import org.apache.airavata.helix.impl.task.submission.config.imp.*;
+import org.apache.airavata.helix.impl.task.submission.config.imp.parser.*;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.RegistryException;
+
+public class JobFactory {
+
+ public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) {
+ switch (resourceJobManagerType) {
+ case FORK:
+ return "FORK_Groovy.template";
+ case PBS:
+ return "PBS_Groovy.template";
+ case SLURM:
+ return "SLURM_Groovy.template";
+ case UGE:
+ return "UGE_Groovy.template";
+ case LSF:
+ return "LSF_Groovy.template";
+ case CLOUD:
+ return "CLOUD_Groovy.template";
+ default:
+ return null;
+ }
+ }
+
+ public static ResourceJobManager getResourceJobManager(AppCatalog appCatalog, JobSubmissionProtocol submissionProtocol, JobSubmissionInterface jobSubmissionInterface) {
+ try {
+ if (submissionProtocol == JobSubmissionProtocol.SSH ) {
+ SSHJobSubmission sshJobSubmission = getSSHJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ return sshJobSubmission.getResourceJobManager();
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localJobSubmission = getLocalJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (localJobSubmission != null) {
+ return localJobSubmission.getResourceJobManager();
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){
+ SSHJobSubmission sshJobSubmission = getSSHJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ return sshJobSubmission.getResourceJobManager();
+ }
+ }
+ } catch (AppCatalogException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static LOCALSubmission getLocalJobSubmission(AppCatalog appCatalog, String submissionId) throws AppCatalogException {
+ try {
+ return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+ throw new AppCatalogException(errorMsg, e);
+ }
+ }
+
+ public static SSHJobSubmission getSSHJobSubmission(AppCatalog appCatalog, String submissionId) throws AppCatalogException {
+ try {
+ return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+ throw new AppCatalogException(errorMsg, e);
+ }
+ }
+
+ public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws Exception {
+ if(resourceJobManager == null)
+ return null;
+
+
+ String templateFileName = getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+ switch (resourceJobManager.getResourceJobManagerType()) {
+ case PBS:
+ return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), new PBSOutputParser());
+ case SLURM:
+ return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
+ .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), new SlurmOutputParser());
+ case LSF:
+ return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), new LSFOutputParser());
+ case UGE:
+ return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), new UGEOutputParser());
+ case FORK:
+ return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), new ForkOutputParser());
+ // We don't have a job configuration manager for CLOUD type
+ default:
+ return null;
+ }
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java
new file mode 100644
index 0000000..1fafb00
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+public interface JobManagerConfiguration {
+
+ public RawCommandInfo getCancelCommand(String jobID);
+
+ public String getJobDescriptionTemplateName();
+
+ public RawCommandInfo getMonitorCommand(String jobID);
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+ public RawCommandInfo getJobIdMonitorCommand(String jobName , String userName);
+
+ public String getScriptExtension();
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+ public OutputParser getParser();
+
+ public String getInstalledPath();
+
+ public String getBaseCancelCommand();
+
+ public String getBaseMonitorCommand();
+
+ public String getBaseSubmitCommand();
+
+}
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java
new file mode 100644
index 0000000..41e8892
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java
@@ -0,0 +1,41 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.Map;
+
+public interface OutputParser {
+
+ /**
+ * This can be used to parseSingleJob the result of a job submission to get the JobID
+ * @param rawOutput
+ * @return the job id as a String, or null if no job id found
+ */
+ public String parseJobSubmission(String rawOutput) throws Exception;
+
+
+ /**
+ * Parse output return by job submission task and identify jobSubmission failures.
+ * @param rawOutput
+ * @return true if job submission has been failed, false otherwise.
+ */
+ public boolean isJobSubmissionFailed(String rawOutput);
+
+
+ /**
+ * This can be used to get the job status from the output
+ * @param jobID
+ * @param rawOutput
+ */
+ public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception;
+
+ /**
+ * This can be used to parseSingleJob a big output and get multipleJob statuses
+ * @param statusMap list of status map will return and key will be the job ID
+ * @param rawOutput
+ */
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception;
+
+
+ public String parseJobId(String jobName, String rawOutput) throws Exception;
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java
new file mode 100644
index 0000000..d7f9fb3
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java
@@ -0,0 +1,22 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+public class RawCommandInfo {
+
+ private String rawCommand;
+
+ public RawCommandInfo(String cmd) {
+ this.rawCommand = cmd;
+ }
+
+ public String getCommand() {
+ return this.rawCommand;
+ }
+
+ public String getRawCommand() {
+ return rawCommand;
+ }
+
+ public void setRawCommand(String rawCommand) {
+ this.rawCommand = rawCommand;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java
new file mode 100644
index 0000000..d25f17f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class ForkJobConfiguration implements JobManagerConfiguration {
+ private final Map<JobManagerCommand, String> jobManagerCommands;
+ private String jobDescriptionTemplateName;
+ private String scriptExtension;
+ private String installedPath;
+ private OutputParser parser;
+
+ public ForkJobConfiguration (String jobDescriptionTemplateName, String scriptExtension, String installedPath,
+ Map<JobManagerCommand, String> jobManagerCommands, OutputParser parser){
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ installedPath = installedPath.trim();
+ if (installedPath.endsWith("/")) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ this.jobManagerCommands = jobManagerCommands;
+ }
+
+ @Override
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.DELETION).trim() + " " +
+ jobID);
+ }
+
+ @Override
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ @Override
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return null;
+ }
+
+ @Override
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return null;
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return null;
+ }
+
+ @Override
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ @Override
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String forkFilePath) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+ workingDirectory + File.separator + FilenameUtils.getName(forkFilePath));
+ }
+
+ @Override
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ @Override
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+ @Override
+ public String getBaseCancelCommand() {
+ return null;
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return null;
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return null;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java
new file mode 100644
index 0000000..36bce60
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.model.status.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobUtil {
+ private static final Logger log = LoggerFactory.getLogger(JobUtil.class);
+
+ public static JobState getJobState(String status) {
+ log.info("parsing the job status returned : " + status);
+ if (status != null) {
+ if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
+ return JobState.COMPLETE;
+// } else if ("H".equals(status) || "h".equals(status)) {
+// return JobState.HELD;
+ } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
+ return JobState.QUEUED;
+ } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
+ return JobState.ACTIVE;
+// } else if ("T".equals(status)) {
+// return JobState.HELD;
+ } else if ("W".equals(status) || "PD".equals(status)) {
+ return JobState.QUEUED;
+ } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
+ return JobState.SUSPENDED;
+ } else if ("CA".equals(status)) {
+ return JobState.CANCELED;
+ } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
+ return JobState.FAILED;
+ } else if ("PR".equals(status) || "Er".equals(status)) {
+ return JobState.FAILED;
+ } else if ("U".equals(status) || ("UNKWN".equals(status))) {
+ return JobState.UNKNOWN;
+ }
+ }
+ return JobState.UNKNOWN;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java
new file mode 100644
index 0000000..bccd7ee
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+public class LSFJobConfiguration implements JobManagerConfiguration {
+ private final static Logger logger = LoggerFactory.getLogger(LSFJobConfiguration.class);
+ private final Map<JobManagerCommand, String> jobMangerCommands;
+ private String jobDescriptionTemplateName;
+ private String scriptExtension;
+ private String installedPath;
+ private OutputParser parser;
+
+ public LSFJobConfiguration(String jobDescriptionTemplateName,
+ String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+ jobManagerCommands, OutputParser parser) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ if (installedPath.endsWith("/") || installedPath.isEmpty()) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ this.jobMangerCommands = jobManagerCommands;
+ }
+
+ @Override
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "bkill " + jobID);
+ }
+
+ @Override
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ @Override
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "bjobs " + jobID);
+ }
+
+ @Override
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + "bjobs -u " + userName);
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return new RawCommandInfo(this.installedPath + "bjobs -J " + jobName);
+ }
+
+ @Override
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ @Override
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + "bsub < " +
+ workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+ }
+
+ @Override
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ public void setParser(OutputParser parser) {
+ this.parser = parser;
+ }
+
+ @Override
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+
+ @Override
+ public String getBaseCancelCommand() {
+ return "bkill";
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return "bjobs";
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return "bsub";
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java
new file mode 100644
index 0000000..aeedeb9
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class PBSJobConfiguration implements JobManagerConfiguration {
+
+ private final Map<JobManagerCommand, String> jobManagerCommands;
+ private String jobDescriptionTemplateName;
+ private String scriptExtension;
+ private String installedPath;
+ private OutputParser parser;
+
+ public PBSJobConfiguration(String jobDescriptionTemplateName, String scriptExtension, String installedPath,
+ Map<JobManagerCommand, String> jobManagerCommands, OutputParser parser) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ installedPath = installedPath.trim();
+ if (installedPath.endsWith("/")) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ this.jobManagerCommands = jobManagerCommands;
+ }
+
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.DELETION).trim() + " " +
+ jobID);
+ }
+
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ }
+
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+ + " -f " + jobID);
+ }
+
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+ workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+ }
+
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+ public void setInstalledPath(String installedPath) {
+ this.installedPath = installedPath;
+ }
+
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ public void setParser(OutputParser parser) {
+ this.parser = parser;
+ }
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+ + " -u " + userName);
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ // For PBS there is no option to get jobDetails by JobName, so we search with userName
+ return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+ + " -u " + userName + " -f | grep \"Job_Name = " + jobName + "\" -B1");
+ }
+
+ @Override
+ public String getBaseCancelCommand() {
+ return jobManagerCommands.get(JobManagerCommand.DELETION).trim();
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim();
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim();
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java
new file mode 100644
index 0000000..fc431ce
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class SlurmJobConfiguration implements JobManagerConfiguration {
+ private final Map<JobManagerCommand, String> jMCommands;
+ private String jobDescriptionTemplateName;
+ private String scriptExtension;
+ private String installedPath;
+ private OutputParser parser;
+
+ public SlurmJobConfiguration(String jobDescriptionTemplateName,
+ String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+ jobManagerCommands, OutputParser parser) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ installedPath = installedPath.trim();
+ if (installedPath.endsWith("/")) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ this.jMCommands = jobManagerCommands;
+ }
+
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.DELETION).trim() + " " + jobID);
+ }
+
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ }
+
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -j " + jobID);
+ }
+
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+ workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+ }
+
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+ public void setInstalledPath(String installedPath) {
+ this.installedPath = installedPath;
+ }
+
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ public void setParser(OutputParser parser) {
+ this.parser = parser;
+ }
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -u " + userName);
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -n " + jobName + " -u " + userName);
+ }
+
+ @Override
+ public String getBaseCancelCommand() {
+ return jMCommands.get(JobManagerCommand.DELETION).trim();
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return jMCommands.get(JobManagerCommand.JOB_MONITORING).trim();
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return jMCommands.get(JobManagerCommand.SUBMISSION).trim();
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java
new file mode 100644
index 0000000..6a12966
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class UGEJobConfiguration implements JobManagerConfiguration {
+ private final Map<JobManagerCommand, String> jobManagerCommands;
+ private String jobDescriptionTemplateName;
+ private String scriptExtension;
+ private String installedPath;
+ private OutputParser parser;
+
+ public UGEJobConfiguration(String jobDescriptionTemplateName,
+ String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+ jobManagerCommands, OutputParser parser) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ if (installedPath.endsWith("/")) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ this.jobManagerCommands = jobManagerCommands;
+ }
+
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "qdel " + jobID);
+ }
+
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ }
+
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "qstat -j " + jobID);
+ }
+
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + "qsub " +
+ workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+ }
+
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+ public void setInstalledPath(String installedPath) {
+ this.installedPath = installedPath;
+ }
+
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ public void setParser(OutputParser parser) {
+ this.parser = parser;
+ }
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ // For PBS there is no option to get jobDetails by JobName, so we search with userName
+ return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+ }
+
+ @Override
+ public String getBaseCancelCommand() {
+ return "qdel";
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return "qstat";
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return "qsub ";
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java
new file mode 100644
index 0000000..c3a5a2e
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class AiravataCustomCommandOutputParser implements OutputParser {
+ private static final Logger log = LoggerFactory.getLogger(AiravataCustomCommandOutputParser.class);
+
+ @Override
+ public String parseJobSubmission(String rawOutput) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java
new file mode 100644
index 0000000..a4f48cc
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ForkOutputParser implements OutputParser {
+ private static final Logger log = LoggerFactory.getLogger(ForkOutputParser.class);
+
+ @Override
+ public String parseJobSubmission(String rawOutput) throws Exception {
+ return AiravataUtils.getId("JOB_ID_");
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
+ @Override
+ public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ // For fork jobs there is no job ID, hence airavata generates a job ID
+ return AiravataUtils.getId(jobName);
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java
new file mode 100644
index 0000000..0bf812f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LSFOutputParser implements OutputParser {
+ private final static Logger logger = LoggerFactory.getLogger(LSFOutputParser.class);
+
+ @Override
+ public String parseJobSubmission(String rawOutput) throws Exception {
+ logger.debug(rawOutput);
+ if (rawOutput.indexOf("<") >= 0) {
+ return rawOutput.substring(rawOutput.indexOf("<")+1,rawOutput.indexOf(">"));
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
+ @Override
+ public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+ boolean jobFount = false;
+ logger.debug(rawOutput);
+ //todo this is not used anymore
+ return null;
+ }
+
+ @Override
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+ logger.debug(rawOutput);
+
+ String[] info = rawOutput.split("\n");
+// int lastStop = 0;
+ for (String jobID : statusMap.keySet()) {
+ String jobName = jobID.split(",")[1];
+ boolean found = false;
+ for (int i = 0; i < info.length; i++) {
+ if (info[i].contains(jobName.substring(0,8))) {
+ // now starts processing this line
+ logger.info(info[i]);
+ String correctLine = info[i];
+ String[] columns = correctLine.split(" ");
+ List<String> columnList = new ArrayList<String>();
+ for (String s : columns) {
+ if (!"".equals(s)) {
+ columnList.add(s);
+ }
+ }
+// lastStop = i + 1;
+ try {
+ statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(2))));
+ }catch(IndexOutOfBoundsException e) {
+ statusMap.put(jobID, new JobStatus(JobState.valueOf("U")));
+ }
+ found = true;
+ break;
+ }
+ }
+ if(!found)
+ logger.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobID.split(",")[0]);
+ }
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ String regJobId = "jobId";
+ Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ logger.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ logger.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
+
+ public static void main(String[] args) {
+ String test = "Job <2477982> is submitted to queue <short>.";
+ System.out.println(test.substring(test.indexOf("<")+1, test.indexOf(">")));
+ String test1 = "JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME\n" +
+ "2636607 lg11w RUN long ghpcc06 c11b02 *069656647 Mar 7 00:58\n" +
+ "2636582 lg11w RUN long ghpcc06 c02b01 2134490944 Mar 7 00:48";
+ Map<String, JobStatus> statusMap = new HashMap<String, JobStatus>();
+ statusMap.put("2477983,2134490944", new JobStatus(JobState.UNKNOWN));
+ LSFOutputParser lsfOutputParser = new LSFOutputParser();
+ try {
+ lsfOutputParser.parseJobStatuses("cjh", statusMap, test1);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ System.out.println(statusMap.get("2477983,2134490944"));
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java
new file mode 100644
index 0000000..3be8c8a
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.JobUtil;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PBSOutputParser implements OutputParser {
+ private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class);
+
+ public String parseJobSubmission(String rawOutput) {
+ log.debug(rawOutput);
+ String jobId = rawOutput;
+ if (!rawOutput.isEmpty() && rawOutput.contains("\n")){
+ String[] split = rawOutput.split("\n");
+ if (split.length != 0){
+ jobId = split[0];
+ }
+ }
+ return jobId; //In PBS stdout is going to be directly the jobID
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
+ public JobStatus parseJobStatus(String jobID, String rawOutput) {
+ boolean jobFount = false;
+ log.debug(rawOutput);
+ String[] info = rawOutput.split("\n");
+ String[] line = null;
+ int index = 0;
+ for (String anInfo : info) {
+ index++;
+ if (anInfo.contains("Job Id:")) {
+ if (anInfo.contains(jobID)) {
+ jobFount = true;
+ break;
+ }
+ }
+ }
+ if (jobFount) {
+ for (int i=index;i<info.length;i++) {
+ String anInfo = info[i];
+ if (anInfo.contains("=")) {
+ line = anInfo.split("=", 2);
+ if (line.length != 0) {
+ if (line[0].contains("job_state")) {
+ return new JobStatus(JobUtil.getJobState(line[1].replaceAll(" ", "")));
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+ log.debug(rawOutput);
+ String[] info = rawOutput.split("\n");
+// int lastStop = 0;
+ for (String jobID : statusMap.keySet()) {
+ String jobName = jobID.split(",")[1];
+ boolean found = false;
+ for (int i = 0; i < info.length; i++) {
+ if (info[i].contains(jobName.substring(0,8))) {
+ // now starts processing this line
+ log.info(info[i]);
+ String correctLine = info[i];
+ String[] columns = correctLine.split(" ");
+ List<String> columnList = new ArrayList<String>();
+ for (String s : columns) {
+ if (!"".equals(s)) {
+ columnList.add(s);
+ }
+ }
+// lastStop = i + 1;
+ try {
+ statusMap.put(jobID, new JobStatus(JobUtil.getJobState(columnList.get(9))));
+ }catch(IndexOutOfBoundsException e) {
+ statusMap.put(jobID, new JobStatus(JobUtil.getJobState("U")));
+ }
+ found = true;
+ break;
+ }
+ }
+ if(!found)
+ log.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobID.split(",")[0]);
+ }
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ /* output will look like
+ Job Id: 2080802.gordon-fe2.local
+ Job_Name = A312402627
+ */
+ String regJobId = "jobId";
+ Pattern pattern = Pattern.compile("(?<" + regJobId + ">[^\\s]*)\\s*.* " + jobName);
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ log.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ log.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
+
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java
new file mode 100644
index 0000000..3ebbcfd
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.JobUtil;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SlurmOutputParser implements OutputParser {
+ private static final Logger log = LoggerFactory.getLogger(SlurmOutputParser.class);
+ public static final int JOB_NAME_OUTPUT_LENGTH = 8;
+ public static final String STATUS = "status";
+ public static final String JOBID = "jobId";
+
+
+ /**
+ * This can be used to parseSingleJob the outpu of sbatch and extrac the jobID from the content
+ *
+ * @param rawOutput
+ * @return
+ */
+ public String parseJobSubmission(String rawOutput) throws Exception {
+ log.info(rawOutput);
+ Pattern pattern = Pattern.compile("Submitted batch job (?<" + JOBID + ">[^\\s]*)");
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(JOBID);
+ }
+ return "";
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ Pattern pattern = Pattern.compile("FAILED");
+ Matcher matcher = pattern.matcher(rawOutput);
+ return matcher.find();
+ }
+
+ public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+ log.info(rawOutput);
+ Pattern pattern = Pattern.compile(jobID + "(?=\\s+\\S+\\s+\\S+\\s+\\S+\\s+(?<" + STATUS + ">\\w+))");
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return new JobStatus(JobUtil.getJobState(matcher.group(STATUS)));
+ }
+ return null;
+ }
+
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+ log.debug(rawOutput);
+ String[] info = rawOutput.split("\n");
+ String lastString = info[info.length - 1];
+ if (lastString.contains("JOBID") || lastString.contains("PARTITION")) {
+ log.info("There are no jobs with this username ... ");
+ return;
+ }
+// int lastStop = 0;
+ for (String jobID : statusMap.keySet()) {
+ String jobId = jobID.split(",")[0];
+ String jobName = jobID.split(",")[1];
+ boolean found = false;
+ for (int i = 0; i < info.length; i++) {
+ if (info[i].contains(jobName.substring(0, 8))) {
+ // now starts processing this line
+ log.info(info[i]);
+ String correctLine = info[i];
+ String[] columns = correctLine.split(" ");
+ List<String> columnList = new ArrayList<String>();
+ for (String s : columns) {
+ if (!"".equals(s)) {
+ columnList.add(s);
+ }
+ }
+ try {
+ statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(4))));
+ } catch (IndexOutOfBoundsException e) {
+ statusMap.put(jobID, new JobStatus(JobState.valueOf("U")));
+ }
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ log.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobId);
+ }
+ }
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ String regJobId = "jobId";
+ if (jobName == null) {
+ return null;
+ } else if(jobName.length() > JOB_NAME_OUTPUT_LENGTH) {
+ jobName = jobName.substring(0, JOB_NAME_OUTPUT_LENGTH);
+ }
+ Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ log.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ log.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java
new file mode 100644
index 0000000..0f457ff
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.parser.PBSOutputParser;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UGEOutputParser implements OutputParser {
+ private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class);
+ public static final String JOB_ID = "jobId";
+
+ public String parseJobSubmission(String rawOutput) {
+ log.debug(rawOutput);
+ if (rawOutput != null && !rawOutput.isEmpty() && !isJobSubmissionFailed(rawOutput)) {
+ String[] info = rawOutput.split("\n");
+ String lastLine = info[info.length - 1];
+ return lastLine.split(" ")[2]; // In PBS stdout is going to be directly the jobID
+ } else {
+ return "";
+ }
+ }
+
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ Pattern pattern = Pattern.compile("Rejecting");
+ Matcher matcher = pattern.matcher(rawOutput);
+ return matcher.find();
+ }
+
+ public JobStatus parseJobStatus(String jobID, String rawOutput) {
+ Pattern pattern = Pattern.compile("job_number:[\\s]+" + jobID);
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return new JobStatus(JobState.QUEUED); // fixme; return correct status.
+ }
+ return new JobStatus(JobState.UNKNOWN);
+ }
+
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+ log.debug(rawOutput);
+ String[] info = rawOutput.split("\n");
+ int lastStop = 0;
+ for (String jobID : statusMap.keySet()) {
+ for(int i=lastStop;i<info.length;i++){
+ if(jobID.split(",")[0].contains(info[i].split(" ")[0]) && !"".equals(info[i].split(" ")[0])){
+ // now starts processing this line
+ log.info(info[i]);
+ String correctLine = info[i];
+ String[] columns = correctLine.split(" ");
+ List<String> columnList = new ArrayList<String>();
+ for (String s : columns) {
+ if (!"".equals(s)) {
+ columnList.add(s);
+ }
+ }
+ lastStop = i+1;
+ if ("E".equals(columnList.get(4))) {
+ // There is another status with the same letter E other than error status
+ // to avoid that we make a small tweek to the job status
+ columnList.set(4, "Er");
+ }
+ statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(4))));
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws Exception {
+ if (jobName.length() > 10) {
+ jobName = jobName.substring(0, 10);
+ }
+ Pattern pattern = Pattern.compile("(?<" + JOB_ID + ">\\S+)\\s+\\S+\\s+(" + jobName + ")");
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(JOB_ID);
+ }
+ return null;
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..fb9917f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -0,0 +1,232 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@TaskDef(name = "Default Job Submission")
+public class DefaultJobSubmissionTask extends JobSubmissionTask {
+
+ private static final Logger logger = LogManager.getLogger(DefaultJobSubmissionTask.class);
+
+ public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper) {
+ try {
+ GroovyMapData groovyMapData = new GroovyMapData();
+
+
+ JobModel jobModel = new JobModel();
+ jobModel.setProcessId(getProcessId());
+ jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setTaskId(getTaskId());
+ jobModel.setJobName(groovyMapData.getJobName());
+
+ File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+ getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+ jobModel.setExitCode(submissionOutput.getExitCode());
+ jobModel.setStdErr(submissionOutput.getStdErr());
+ jobModel.setStdOut(submissionOutput.getStdOut());
+
+ String jobId = submissionOutput.getJobId();
+
+ if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) {
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ if (submissionOutput.isJobSubmissionFailed()) {
+ List<JobStatus> statusList = new ArrayList<>();
+ statusList.add(new JobStatus(JobState.FAILED));
+ statusList.get(0).setReason(submissionOutput.getFailureReason());
+ jobModel.setJobStatuses(statusList);
+ saveJobModel(jobModel);
+ logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
+ getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName());
+
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason());
+ errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ //taskStatus.setState(TaskState.FAILED);
+ //taskStatus.setReason("Job submission command didn't return a jobId");
+ //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ //taskContext.setTaskStatus(taskStatus);
+ return onFail("Job submission command didn't return a jobId", false, null);
+
+ } else {
+ String msg;
+ saveJobModel(jobModel);
+ ErrorModel errorModel = new ErrorModel();
+ if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
+ msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+ " return non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
+ ", with failure reason : " + submissionOutput.getFailureReason()
+ + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
+ } else {
+ msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+ " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
+ ", with failure reason : stdout ->" + submissionOutput.getStdOut() +
+ " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ;
+ errorModel.setActualErrorMessage(msg);
+ }
+ logger.error(msg);
+ errorModel.setUserFriendlyMessage(msg);
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ //taskStatus.setState(TaskState.FAILED);
+ //taskStatus.setReason(msg);
+ //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ //taskContext.setTaskStatus(taskStatus);
+ return onFail(msg, false, null);
+ }
+
+ //TODO save task status??
+ } else if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ saveJobStatus(jobModel);
+
+ if (verifyJobSubmissionByJobId(adaptor, jobId)) {
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ saveJobStatus(jobModel);
+ }
+
+ if (getComputeResourceDescription().isGatewayUsageReporting()){
+ String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
+ String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
+ ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId());
+ String username = experiment.getUserName() + "@" + getGatewayComputeResourcePreference().getUsageReportingGatewayId();
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username +
+ " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId );
+ adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
+ }
+ //taskStatus = new TaskStatus(TaskState.COMPLETED);
+ //taskStatus.setReason("Submitted job to compute resource");
+ //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
+ return onSuccess("Submitted job to compute resource");
+ } else {
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getComputeResourceLoginUserName());
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ saveJobStatus(jobModel);
+ //taskStatus.setState(TaskState.COMPLETED);
+ //taskStatus.setReason("Submitted job to compute resource");
+ //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ break;
+ }
+ logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs");
+ Thread.sleep(verificationTryCount * 10000);
+ }
+ }
+
+ if (jobId == null || jobId.isEmpty()) {
+ jobModel.setJobId(DEFAULT_JOB_ID);
+ saveJobModel(jobModel);
+ String msg = "expId:" + getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ logger.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(msg);
+ errorModel.setActualErrorMessage(msg);
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ //taskStatus.setState(TaskState.FAILED);
+ //taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+ //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ return onFail("Couldn't find job id in both submitted and verified steps", false, null);
+ }else {
+ //GFacUtils.saveJobModel(processContext, jobModel);
+ }
+
+ } else {
+ //taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ return onFail("Job file is null", true, null);
+ // taskStatus.setReason("JobFile is null");
+ } else {
+ //taskStatus.setReason("Job file doesn't exist");
+ return onFail("Job file doesn't exist", true, null);
+ }
+ }
+ } catch (Exception e) {
+ return onFail("Task failed due to unexpected issue", false, null);
+ }
+ // TODO get rid of this
+ return onFail("Task moved to an unknown state", false, null);
+ }
+
+ private boolean verifyJobSubmissionByJobId(AgentAdaptor agentAdaptor, String jobID) throws Exception {
+ JobStatus status = getJobStatus(agentAdaptor, jobID);
+ return status != null && status.getJobState() != JobState.UNKNOWN;
+ }
+
+ private String verifyJobSubmission(AgentAdaptor agentAdaptor, String jobName, String userName) {
+ String jobId = null;
+ try {
+ jobId = getJobIdByJobName(agentAdaptor, jobName, userName);
+ } catch (Exception e) {
+ logger.error("Error while verifying JobId from JobName " + jobName);
+ }
+ return jobId;
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..da04365
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -0,0 +1,79 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.util.Arrays;
+
+@TaskDef(name = "Fork Job Submission")
+public class ForkJobSubmissionTask extends JobSubmissionTask {
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper) {
+
+ try {
+ GroovyMapData groovyMapData = new GroovyMapData();
+
+ JobModel jobModel = new JobModel();
+ jobModel.setProcessId(getProcessId());
+ jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setTaskId(getTaskId());
+ jobModel.setJobName(groovyMapData.getJobName());
+
+ File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+ getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+ jobModel.setExitCode(submissionOutput.getExitCode());
+ jobModel.setStdErr(submissionOutput.getStdErr());
+ jobModel.setStdOut(submissionOutput.getStdOut());
+
+ String jobId = submissionOutput.getJobId();
+
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ saveJobModel(jobModel);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ saveJobStatus(jobModel);
+
+ return null;
+ } else {
+ String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" +
+ jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " +
+ "Hence changing experiment state to Failed";
+ }
+
+ }
+ return null;
+
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
new file mode 100644
index 0000000..fe5a3dc
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -0,0 +1,202 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.helix.HelixManager;
+
+import java.io.File;
+import java.util.*;
+
+public abstract class JobSubmissionTask extends AiravataTask {
+
+
+
+ @Override
+ public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+ super.init(manager, workflowName, jobName, taskName);
+ }
+
+ //////////////////////
+ protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, File jobFile, String workingDirectory) throws Exception {
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+ getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+ RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobFile.getPath());
+ CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
+
+ JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+
+ jsoutput.setJobId(jobManagerConfiguration.getParser().parseJobSubmission(commandOutput.getStdOut()));
+ if (jsoutput.getJobId() == null) {
+ if (jobManagerConfiguration.getParser().isJobSubmissionFailed(commandOutput.getStdOut())) {
+ jsoutput.setJobSubmissionFailed(true);
+ jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
+ "\n stderr : " + commandOutput.getStdError());
+ }
+ }
+ jsoutput.setExitCode(commandOutput.getExitCode());
+ if (jsoutput.getExitCode() != 0) {
+ jsoutput.setJobSubmissionFailed(true);
+ jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
+ "\n stderr : " + commandOutput.getStdError());
+ }
+ jsoutput.setStdOut(commandOutput.getStdOut());
+ jsoutput.setStdErr(commandOutput.getStdError());
+ return jsoutput;
+
+ }
+
+ public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception {
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+ getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+ CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobID).getRawCommand(), null);
+
+ return jobManagerConfiguration.getParser().parseJobStatus(jobID, commandOutput.getStdOut());
+
+ }
+
+ public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception {
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+ getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+
+ RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+ CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null);
+ return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
+ }
+
+ ////////////////////////////////
+
+
+ /////////////////////////////////////////////
+ public void saveExperimentError(ErrorModel errorModel) throws Exception {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+ getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, getExperimentId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+ + " : - Error while updating experiment errors";
+ throw new Exception(msg, e);
+ }
+ }
+
+ public void saveProcessError(ErrorModel errorModel) throws Exception {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+ getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+ + " : - Error while updating process errors";
+ throw new Exception(msg, e);
+ }
+ }
+
+ public void saveTaskError(ErrorModel errorModel) throws Exception {
+ try {
+ errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+ getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
+ } catch (RegistryException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
+ + " : - Error while updating task errors";
+ throw new Exception(msg, e);
+ }
+ }
+
+ public void saveJobModel(JobModel jobModel) throws RegistryException {
+ getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId());
+ }
+
+ public void saveJobStatus(JobModel jobModel) throws Exception {
+ try {
+ // first we save job jobModel to the registry for sa and then save the job status.
+ JobStatus jobStatus = null;
+ if(jobModel.getJobStatuses() != null)
+ jobStatus = jobModel.getJobStatuses().get(0);
+
+ List<JobStatus> statuses = new ArrayList<>();
+ statuses.add(jobStatus);
+ jobModel.setJobStatuses(statuses);
+
+ if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) {
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ } else {
+ jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+ }
+
+ CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
+ getExperimentCatalog().add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+ JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+ getProcessId(), getProcessModel().getExperimentId(), getGatewayId());
+
+ JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+ MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+ (MessageType.JOB.name()), getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception e) {
+ throw new Exception("Error persisting job status" + e.getLocalizedMessage(), e);
+ }
+ }
+
+ ///////////// required for groovy map
+
+ private String workingDir;
+ private String scratchLocation;
+ private UserComputeResourcePreference userComputeResourcePreference;
+
+ public String getWorkingDir() {
+ if (workingDir == null) {
+ if (getProcessModel().getProcessResourceSchedule().getStaticWorkingDir() != null){
+ workingDir = getProcessModel().getProcessResourceSchedule().getStaticWorkingDir();
+ }else {
+ String scratchLocation = getScratchLocation();
+ workingDir = (scratchLocation.endsWith("/") ? scratchLocation + getProcessId() : scratchLocation + "/" +
+ getProcessId());
+ }
+ }
+ return workingDir;
+ }
+
+ public String getScratchLocation() {
+ if (scratchLocation == null) {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getScratchLocation())) {
+ scratchLocation = userComputeResourcePreference.getScratchLocation();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+ scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+ }else {
+ scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+ }
+ }
+ return scratchLocation;
+ }
+
+ protected UserComputeResourcePreference userComputeResourcePreference() throws AppCatalogException {
+ UserComputeResourcePreference userComputeResourcePreference =
+ getAppCatalog().getUserResourceProfile().getUserComputeResourcePreference(
+ getProcessModel().getUserName(),
+ getGatewayId(),
+ getProcessModel().getComputeResourceId());
+ }
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
new file mode 100644
index 0000000..5a3ca31
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -0,0 +1,81 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.UUID;
+
+@TaskDef(name = "Local Job Submission")
+public class LocalJobSubmissionTask extends JobSubmissionTask {
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper) {
+
+ try {
+ GroovyMapData groovyMapData = new GroovyMapData();
+ String jobId = "JOB_ID_" + UUID.randomUUID().toString();
+
+ JobModel jobModel = new JobModel();
+ jobModel.setProcessId(getProcessId());
+ jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setTaskId(getTaskId());
+ jobModel.setJobId(jobId);
+
+ File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ saveJobModel(jobModel);
+
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+ getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+ JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+
+ saveJobStatus(jobModel);
+
+ jobModel.setExitCode(submissionOutput.getExitCode());
+ jobModel.setStdErr(submissionOutput.getStdErr());
+ jobModel.setStdOut(submissionOutput.getStdOut());
+
+ jobStatus.setJobState(JobState.COMPLETE);
+ jobStatus.setReason("Successfully Completed " + getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+
+ saveJobStatus(jobModel);
+
+ return null;
+ }
+
+ return null;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void onCancel() {
+
+ }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
new file mode 100644
index 0000000..51feff4
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -0,0 +1,31 @@
+package org.apache.airavata.helix.impl.workflow;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.impl.task.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
+import org.apache.airavata.helix.workflow.WorkflowManager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class SimpleWorkflow {
+
+ public static void main(String[] args) throws Exception {
+
+ EnvSetupTask envSetupTask = new EnvSetupTask();
+ envSetupTask.setWorkingDirectory("/tmp/a");
+
+ DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
+ defaultJobSubmissionTask.setGatewayId("default");
+ defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
+ defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
+ defaultJobSubmissionTask.setTaskId(UUID.randomUUID().toString());
+
+ List<AbstractTask> tasks = new ArrayList<>();
+ tasks.add(defaultJobSubmissionTask);
+
+ WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
+ workflowManager.launchWorkflow(UUID.randomUUID().toString(), tasks, true);
+ }
+}
diff --git a/modules/helix-spectator/src/main/resources/airavata-server.properties b/modules/helix-spectator/src/main/resources/airavata-server.properties
new file mode 100644
index 0000000..5f47d79
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/airavata-server.properties
@@ -0,0 +1,334 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+#
+# This properties file provides configuration for all Airavata Services:
+# API Server, Registry, Workflow Interpreter, GFac, Orchestrator
+#
+###########################################################################
+
+###########################################################################
+# API Server Registry Configuration
+###########################################################################
+
+#for derby [AiravataJPARegistry]
+#registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#registry.jdbc.url=jdbc:derby://localhost:1527/experiment_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+registry.jdbc.driver=org.mariadb.jdbc.Driver
+registry.jdbc.url=jdbc:mariadb://149.165.168.248:3306/experiment_catalog
+registry.jdbc.user=eroma
+registry.jdbc.password=eroma123456
+#FIXME: Probably the following property should be removed.
+start.derby.server.mode=false
+validationQuery=SELECT 1 from CONFIGURATION
+cache.enable=false
+jpa.cache.size=-1
+#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
+enable.sharing=true
+
+# Properties for default user mode
+default.registry.user=default-admin
+default.registry.password=123456
+default.registry.password.hash.method=SHA
+default.registry.gateway=default
+super.tenant.gatewayId=default
+
+# Properties for cluster status monitoring
+# cluster status monitoring job repeat time in seconds
+cluster.status.monitoring.enable=false
+cluster.status.monitoring.repeat.time=18000
+
+###########################################################################
+# Application Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#appcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#appcatalog.jdbc.url=jdbc:derby://localhost:1527/app_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+appcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+appcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/app_catalog
+appcatalog.jdbc.user=eroma
+appcatalog.jdbc.password=eroma123456
+appcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+##########################################################################
+# Replica Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#replicacatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#replicacatalog.jdbc.url=jdbc:derby://localhost:1527/replica_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+replicacatalog.jdbc.driver=org.mariadb.jdbc.Driver
+replicacatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog
+replicacatalog.jdbc.user=eroma
+replicacatalog.jdbc.password=eroma123456
+replicacatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+# Workflow Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#workflowcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#workflowcatalog.jdbc.url=jdbc:derby://localhost:1527/workflow_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+workflowcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+workflowcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog
+workflowcatalog.jdbc.user=eroma
+workflowcatalog.jdbc.password=eroma123456
+workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+# Sharing Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#sharingcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#sharingcatalog.jdbc.url=jdbc:derby://localhost:1527/sharing_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+sharingcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+sharingcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/sharing_catalog
+sharingcatalog.jdbc.user=eroma
+sharingcatalog.jdbc.password=eroma123456
+sharingcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+# Sharing Registry Server Configuration
+###########################################################################
+sharing_server=org.apache.airavata.sharing.registry.server.SharingRegistryServer
+sharing.registry.server.host=192.168.99.102
+sharing.registry.server.port=7878
+
+###########################################################################
+# User Profile MongoDB Configuration
+###########################################################################
+userprofile.mongodb.host=localhost
+userprofile.mongodb.port=27017
+
+
+###########################################################################
+# Server module Configuration
+###########################################################################
+#credential store server should be started before API server
+#This is obsolete property with new script files.
+#servers=credentialstore,apiserver,orchestrator
+
+
+###########################################################################
+# API Server Configurations
+###########################################################################
+apiserver=org.apache.airavata.api.server.AiravataAPIServer
+apiserver.name=apiserver-node0
+apiserver.host=192.168.99.102
+apiserver.port=8930
+apiserver.min.threads=50
+
+###########################################################################
+# Orchestrator Server Configurations
+###########################################################################
+orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
+orchestrator.server.name=orchestrator-node0
+orchestrator.server.host=192.168.99.102
+orchestrator.server.port=8940
+orchestrator.server.min.threads=50
+job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
+submitter.interval=10000
+threadpool.size=10
+start.submitter=true
+embedded.mode=true
+enable.validation=true
+
+###########################################################################
+# Registry Server Configurations
+###########################################################################
+regserver=org.apache.airavata.registry.api.service.RegistryAPIServer
+regserver.server.name=regserver-node0
+regserver.server.host=192.168.99.102
+regserver.server.port=8970
+regserver.server.min.threads=50
+
+###########################################################################
+# GFac Server Configurations
+###########################################################################
+gfac=org.apache.airavata.gfac.server.GfacServer
+gfac.server.name=gfac-node0
+gfac.server.host=10.0.2.15
+gfac.server.port=8950
+gfac.thread.pool.size=50
+host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler
+
+
+
+###########################################################################
+# Airavata Workflow Interpreter Configurations
+###########################################################################
+workflowserver=org.apache.airavata.api.server.WorkflowServer
+enactment.thread.pool.size=10
+
+#to define custom workflow parser user following property
+#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder
+
+
+
+###########################################################################
+# Job Scheduler can send informative email messages to you about the status of your job.
+# Specify a string which consists of either the single character "n" (no mail), or one or more
+# of the characters "a" (send mail when job is aborted), "b" (send mail when job begins),
+# and "e" (send mail when job terminates). The default is "a" if not specified.
+###########################################################################
+
+job.notification.enable=true
+#Provide comma separated email ids as a string if more than one
+job.notification.emailids=
+job.notification.flags=abe
+
+###########################################################################
+# Credential Store module Configuration
+###########################################################################
+credential.store.keystore.url=/home/pga/master-deployment/keystores/cred_store.jks
+credential.store.keystore.alias=seckey
+credential.store.keystore.password=123456
+credential.store.jdbc.url=jdbc:mariadb://149.165.168.248:3306/credential_store
+credential.store.jdbc.user=eroma
+credential.store.jdbc.password=eroma123456
+credential.store.jdbc.driver=org.mariadb.jdbc.Driver
+credential.store.server.host=192.168.99.102
+credential.store.server.port=8960
+credentialstore=org.apache.airavata.credential.store.server.CredentialStoreServer
+credential.stroe.jdbc.validationQuery=SELECT 1 from CONFIGURATION
+
+# these properties used by credential store email notifications
+email.server=smtp.googlemail.com
+email.server.port=465
+email.user=airavata
+email.password=xxx
+email.ssl=true
+email.from=airavata@apache.org
+
+# SSH PKI key pair or ssh password can be used SSH based sshKeyAuthentication is used.
+# if user specify both password sshKeyAuthentication gets the higher preference
+
+################# ---------- For ssh key pair sshKeyAuthentication ------------------- ################
+#ssh.public.key=/path to public key for ssh
+#ssh.private.key=/path to private key file for ssh
+#ssh.keypass=passphrase for the private key
+#ssh.username=username for ssh connection
+## If you set "yes" for ssh.strict.hostKey.checking, then you must provide known hosts file path
+#ssh.strict.hostKey.checking=yes/no
+#ssh.known.hosts.file=/path to known hosts file
+### Incase of password sshKeyAuthentication.
+#ssh.password=Password for ssh connection
+
+################ ---------- BES Properties ------------------- ###############
+#bes.ca.cert.path=<location>/certificates/cacert.pem
+#bes.ca.key.path=<location>/certificates/cakey.pem
+#bes.ca.key.pass=passphrase
+
+###########################################################################
+# Monitoring module Configuration
+###########################################################################
+
+#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
+#mechanisms and one would be able to start a monitor
+monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
+
+#These properties will used to enable email base monitoring
+email.based.monitor.host=imap.gmail.com
+email.based.monitor.address=ansibletestairavata@gmail.com
+email.based.monitor.password=ansibletestairavata123
+email.based.monitor.folder.name=INBOX
+# either imaps or pop3
+email.based.monitor.store.protocol=imaps
+#These property will be used to query the email server periodically. value in milliseconds(ms).
+email.based.monitoring.period=10000
+
+###########################################################################
+# AMQP Notification Configuration
+###########################################################################
+#for simple scenarios we can use the guest user
+#rabbitmq.broker.url=amqp://localhost:5672
+#for production scenarios, give url as amqp://userName:password@hostName:portNumber/virtualHost, create user, virtualhost
+# and give permissions, refer: http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html
+rabbitmq.broker.url=amqp://airavata:123456@192.168.99.102:5672/master
+rabbitmq.status.exchange.name=status_exchange
+rabbitmq.process.exchange.name=process_exchange
+rabbitmq.experiment.exchange.name=experiment_exchange
+durable.queue=false
+prefetch.count=200
+process.launch.queue.name=process.launch.queue
+experiment.launch..queue.name=experiment.launch.queue
+
+###########################################################################
+# Zookeeper Server Configuration
+###########################################################################
+embedded.zk=false
+zookeeper.server.connection=192.168.99.102:2181
+zookeeper.timeout=30000
+
+########################################################################
+## API Security Configuration
+########################################################################
+api.secured=false
+security.manager.class=org.apache.airavata.service.security.KeyCloakSecurityManager
+### TLS related configuration ####
+TLS.enabled=true
+TLS.api.server.port=9930
+TLS.client.timeout=10000
+#### keystore configuration ####
+keystore.path=/home/pga/master-deployment/keystores/airavata.jks
+keystore.password=password
+#### trust store configuration ####
+trust.store=/home/pga/master-deployment/keystores/client_truststore.jks
+trust.store.password=password
+#### authorization cache related configuration ####
+authz.cache.enabled=true
+authz.cache.manager.class=org.apache.airavata.service.security.authzcache.DefaultAuthzCacheManager
+in.memory.cache.size=1000
+
+# Kafka Logging related configuration
+isRunningOnAws=false
+kafka.broker.list=localhost:9092
+kafka.topic.prefix=local
+enable.kafka.logging=false
+
+###########################################################################
+# Profile Service Configuration
+###########################################################################
+profile.service.server.host=192.168.99.102
+profile.service.server.port=8962
+profile_service=org.apache.airavata.service.profile.server.ProfileServiceServer
+# MariaDB properties
+profile.service.jdbc.url=jdbc:mariadb://149.165.168.248:3306/profile_service
+profile.service.jdbc.user=eroma
+profile.service.jdbc.password=eroma123456
+profile.service.jdbc.driver=org.mariadb.jdbc.Driver
+profile.service.validationQuery=SELECT 1
+
+###########################################################################
+# Iam Admin services Configuration
+###########################################################################
+iam.server.url=https://192.168.99.102/auth
+iam.server.super.admin.username=admin
+iam.server.super.admin.password=123456
+
+###########################################################################
+# DB Event Manager Runner
+###########################################################################
+db_event_manager=org.apache.airavata.db.event.manager.DBEventManagerRunner
diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties
new file mode 100644
index 0000000..41c5e5f
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+participant.name=all-p1
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e910f32
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.category.org.apache.helix=WARN
+log4j.category.org.apache.zookeeper=WARN
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 91202d7..bd99a2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -696,6 +696,8 @@
<module>modules/distribution</module>
<!--<module>modules/test-suite</module>-->
<module>modules/compute-account-provisioning</module>
+ <module>modules/airavata-helix</module>
+ <module>modules/helix-spectator</module>
</modules>
</profile>
<profile>
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.