You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:09:58 UTC

[airavata] 01/17: Initial helix migration

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit c9a1b064bb71dd936bb7f480bf4d7703ba7ad9a0
Author: dimuthu <di...@gmail.com>
AuthorDate: Wed Feb 21 13:31:29 2018 -0500

    Initial helix migration
---
 modules/airavata-helix/agent-api/pom.xml           |  47 +++
 .../apache/airavata/agents/api/AdaptorParams.java  |  26 ++
 .../java/org/apache/airavata/agents/api/Agent.java |  10 +
 .../apache/airavata/agents/api/AgentAdaptor.java   |  23 ++
 .../apache/airavata/agents/api/AgentException.java |  30 ++
 .../org/apache/airavata/agents/api/AgentStore.java | 103 +++++
 .../apache/airavata/agents/api/CommandOutput.java  |  16 +
 .../airavata/agents/api/JobSubmissionOutput.java   |  74 ++++
 modules/airavata-helix/agent-impl/pom.xml          |  27 ++
 .../airavata-helix/agent-impl/ssh-agent/pom.xml    |  76 ++++
 .../helix/agent/local/LocalAgentAdaptor.java       |  43 +++
 .../airavata/helix/agent/ssh/SshAdaptorParams.java | 116 ++++++
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  | 430 +++++++++++++++++++++
 .../helix/agent/ssh/StandardOutReader.java         |  83 ++++
 modules/airavata-helix/pom.xml                     |  24 ++
 modules/airavata-helix/task-api/pom.xml            |  41 ++
 .../apache/airavata/helix/task/api/TaskHelper.java |  13 +
 .../helix/task/api/annotation/TaskDef.java         |  18 +
 .../helix/task/api/annotation/TaskOutPort.java     |  18 +
 .../helix/task/api/annotation/TaskParam.java       |  20 +
 .../helix/task/api/support/AdaptorSupport.java     |  52 +++
 .../src/main/resources/application.properties      |   3 +
 .../task-api/src/main/resources/log4j.properties   |   9 +
 modules/airavata-helix/task-core/pom.xml           |  47 +++
 .../apache/airavata/helix/core/AbstractTask.java   | 108 ++++++
 .../org/apache/airavata/helix/core/OutPort.java    |  44 +++
 .../helix/core/controller/HelixController.java     |  91 +++++
 .../helix/core/participant/HelixParticipant.java   | 171 ++++++++
 .../helix/core/support/AdaptorSupportImpl.java     |  47 +++
 .../helix/core/support/TaskHelperImpl.java         |  16 +
 .../airavata/helix/core/util/PropertyResolver.java |  44 +++
 .../apache/airavata/helix/core/util/TaskUtil.java  | 103 +++++
 modules/airavata-helix/workflow-impl/pom.xml       |  44 +++
 .../airavata/helix/workflow/SimpleWorkflow.java    |  40 ++
 .../airavata/helix/workflow/WorkflowManager.java   |  94 +++++
 modules/helix-spectator/pom.xml                    |  50 +++
 .../helix/impl/participant/GlobalParticipant.java  |  68 ++++
 .../airavata/helix/impl/task/AiravataTask.java     | 293 ++++++++++++++
 .../airavata/helix/impl/task/DataStagingTask.java  |  19 +
 .../airavata/helix/impl/task/EnvSetupTask.java     |  64 +++
 .../helix/impl/task/submission/GroovyMapData.java  | 415 ++++++++++++++++++++
 .../helix/impl/task/submission/Script.java         |  43 +++
 .../helix/impl/task/submission/ScriptTag.java      |  13 +
 .../helix/impl/task/submission/SubmissionUtil.java |  10 +
 .../impl/task/submission/config/JobFactory.java    | 102 +++++
 .../submission/config/JobManagerConfiguration.java |  29 ++
 .../impl/task/submission/config/OutputParser.java  |  41 ++
 .../task/submission/config/RawCommandInfo.java     |  22 ++
 .../config/imp/ForkJobConfiguration.java           | 113 ++++++
 .../impl/task/submission/config/imp/JobUtil.java   |  58 +++
 .../submission/config/imp/LSFJobConfiguration.java | 120 ++++++
 .../submission/config/imp/PBSJobConfiguration.java | 122 ++++++
 .../config/imp/SlurmJobConfiguration.java          | 117 ++++++
 .../submission/config/imp/UGEJobConfiguration.java | 117 ++++++
 .../parser/AiravataCustomCommandOutputParser.java  |  56 +++
 .../config/imp/parser/ForkOutputParser.java        |  58 +++
 .../config/imp/parser/LSFOutputParser.java         | 132 +++++++
 .../config/imp/parser/PBSOutputParser.java         | 142 +++++++
 .../config/imp/parser/SlurmOutputParser.java       | 137 +++++++
 .../config/imp/parser/UGEOutputParser.java         | 108 ++++++
 .../submission/task/DefaultJobSubmissionTask.java  | 232 +++++++++++
 .../submission/task/ForkJobSubmissionTask.java     |  79 ++++
 .../task/submission/task/JobSubmissionTask.java    | 202 ++++++++++
 .../submission/task/LocalJobSubmissionTask.java    |  81 ++++
 .../helix/impl/workflow/SimpleWorkflow.java        |  31 ++
 .../src/main/resources/airavata-server.properties  | 334 ++++++++++++++++
 .../src/main/resources/application.properties      |   3 +
 .../src/main/resources/log4j.properties            |  11 +
 pom.xml                                            |   2 +
 69 files changed, 5575 insertions(+)

diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-api/pom.xml
new file mode 100644
index 0000000..02ee48a
--- /dev/null
+++ b/modules/airavata-helix/agent-api/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>agent-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <!--<build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java
new file mode 100644
index 0000000..ca6f80a
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AdaptorParams.java
@@ -0,0 +1,26 @@
+package org.apache.airavata.agents.api;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AdaptorParams {
+
+    public Object loadFromFile(File file) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(file, this.getClass());
+    }
+
+    public void writeToFile(File file) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.writeValue(file, this);
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java
new file mode 100644
index 0000000..f48aa3e
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/Agent.java
@@ -0,0 +1,10 @@
+package org.apache.airavata.agents.api;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class Agent {
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
new file mode 100644
index 0000000..2d295de
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -0,0 +1,23 @@
+package org.apache.airavata.agents.api;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface AgentAdaptor {
+
+    public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException;
+
+    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
+
+    public void createDirectory(String path) throws AgentException;
+
+    public void copyFile(String sourceFile, String destinationFile) throws AgentException;
+
+    public List<String> listDirectory(String path) throws AgentException;
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java
new file mode 100644
index 0000000..9dfe50e
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentException.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.agents.api;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AgentException extends Exception {
+
+    public AgentException() {
+        super();
+    }
+
+    public AgentException(String message) {
+        super(message);
+    }
+
+    public AgentException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public AgentException(Throwable cause) {
+        super(cause);
+    }
+
+    protected AgentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java
new file mode 100644
index 0000000..78f2276
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentStore.java
@@ -0,0 +1,103 @@
+package org.apache.airavata.agents.api;
+
+import java.io.*;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AgentStore {
+    public AgentAdaptor fetchAdaptor(String computeResource, String Protocol, String authToken) throws AgentException {
+
+        AgentData agentData = getAgentDataForComputeResource(computeResource);
+
+        try {
+            URL[] urls = new URL[1];
+            urls[0] = new URL(agentData.getLibraryLocation());
+            URLClassLoader classLoader = new URLClassLoader(urls, AgentAdaptor.class.getClassLoader());
+
+            Class<?> clazz = classLoader.loadClass(agentData.getAdaptorClass());
+            AgentAdaptor agentAdaptor = (AgentAdaptor) clazz.newInstance();
+
+            Class<?> paramClazz = classLoader.loadClass(agentData.paramClass);
+            AdaptorParams adaptorParams = (AdaptorParams) paramClazz.newInstance();
+
+            Object paramsInit = adaptorParams.loadFromFile(new File(agentData.paramDataFile));
+
+            //agentAdaptor.init(paramsInit);
+            System.out.println("Done");
+
+            return agentAdaptor;
+
+        } catch (IllegalAccessException | ClassNotFoundException | InstantiationException | IOException e) {
+            e.printStackTrace();
+            throw new AgentException("Failed to fetch agent adaptor for compute resource " + computeResource, e);
+        }
+    }
+
+    public static void main(String args[]) throws InstantiationException, IOException, AgentException {
+        AgentStore store = new AgentStore();
+
+        AgentAdaptor agentAdaptor = store.fetchAdaptor("localhost", null, null);
+        System.out.println("Agent loaded");
+    }
+
+    private AgentData getAgentDataForComputeResource(String computeResource) {
+        if ("localhost".equals(computeResource)) {
+            return new AgentData().setLibraryLocation("file:///Users/dimuthu/code/fork/airavata-sandbox/airavata-helix/modules/agent-impl/ssh-agent/target/ssh-agent-1.0-SNAPSHOT-jar-with-dependencies.jar")
+                    .setAdaptorClass("org.apache.airavata.helix.agent.ssh.SshAgentAdaptor")
+                    .setParamClass("org.apache.airavata.helix.agent.ssh.SshAdaptorParams")
+                    .setParamDataFile("/tmp/ssh-param.json");
+        }
+
+        return null;
+    }
+
+    public static class AgentData {
+
+        private String libraryLocation;
+        private String adaptorClass;
+        private String paramClass;
+        private String paramDataFile;
+
+        public String getLibraryLocation() {
+            return libraryLocation;
+        }
+
+        public AgentData setLibraryLocation(String libraryLocation) {
+            this.libraryLocation = libraryLocation;
+            return this;
+        }
+
+        public String getAdaptorClass() {
+            return adaptorClass;
+        }
+
+        public AgentData setAdaptorClass(String adaptorClass) {
+            this.adaptorClass = adaptorClass;
+            return this;
+        }
+
+        public String getParamClass() {
+            return paramClass;
+        }
+
+        public AgentData setParamClass(String paramClass) {
+            this.paramClass = paramClass;
+            return this;
+        }
+
+        public String getParamDataFile() {
+            return paramDataFile;
+        }
+
+        public AgentData setParamDataFile(String paramDataFile) {
+            this.paramDataFile = paramDataFile;
+            return this;
+        }
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java
new file mode 100644
index 0000000..94a0118
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/CommandOutput.java
@@ -0,0 +1,16 @@
+package org.apache.airavata.agents.api;
+
+import java.io.OutputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface CommandOutput {
+
+       String getStdOut();
+       String getStdError();
+       Integer getExitCode();
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
new file mode 100644
index 0000000..1858826
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java
@@ -0,0 +1,74 @@
+package org.apache.airavata.agents.api;
+
+public class JobSubmissionOutput {
+    private int exitCode = Integer.MIN_VALUE;
+    private String stdOut;
+    private String stdErr;
+    private String command;
+    private String jobId;
+    private boolean isJobSubmissionFailed;
+    private String failureReason;
+
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    public JobSubmissionOutput setExitCode(int exitCode) {
+        this.exitCode = exitCode;
+        return this;
+    }
+
+    public String getStdOut() {
+        return stdOut;
+    }
+
+    public JobSubmissionOutput setStdOut(String stdOut) {
+        this.stdOut = stdOut;
+        return this;
+    }
+
+    public String getStdErr() {
+        return stdErr;
+    }
+
+    public JobSubmissionOutput setStdErr(String stdErr) {
+        this.stdErr = stdErr;
+        return this;
+    }
+
+    public String getCommand() {
+        return command;
+    }
+
+    public JobSubmissionOutput setCommand(String command) {
+        this.command = command;
+        return this;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public JobSubmissionOutput setJobId(String jobId) {
+        this.jobId = jobId;
+        return this;
+    }
+
+    public boolean isJobSubmissionFailed() {
+        return isJobSubmissionFailed;
+    }
+
+    public JobSubmissionOutput setJobSubmissionFailed(boolean jobSubmissionFailed) {
+        isJobSubmissionFailed = jobSubmissionFailed;
+        return this;
+    }
+
+    public String getFailureReason() {
+        return failureReason;
+    }
+
+    public JobSubmissionOutput setFailureReason(String failureReason) {
+        this.failureReason = failureReason;
+        return this;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/pom.xml b/modules/airavata-helix/agent-impl/pom.xml
new file mode 100644
index 0000000..57f0c08
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>agent-impl</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>ssh-agent</module>
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
new file mode 100644
index 0000000..44cf919
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>agent-impl</artifactId>
+        <groupId>org.apache</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ssh-agent</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.53</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
+                    <!-- get all project dependencies -->
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <!-- MainClass in mainfest make a executable jar -->
+
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <!-- bind to the packaging phase -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
new file mode 100644
index 0000000..af507bf
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.helix.agent.local;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+
+import java.io.File;
+import java.util.List;
+
+public class LocalAgentAdaptor implements AgentAdaptor {
+
+
+
+    public void init(Object agentPams) throws AgentException {
+
+    }
+
+    @Override
+    public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
+
+    }
+
+    @Override
+    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+        return null;
+    }
+
+    @Override
+    public void createDirectory(String path) throws AgentException {
+
+    }
+
+    @Override
+    public void copyFile(String sourceFile, String destinationFile) throws AgentException {
+
+    }
+
+    @Override
+    public List<String> listDirectory(String path) throws AgentException {
+        return null;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java
new file mode 100644
index 0000000..f54ae60
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAdaptorParams.java
@@ -0,0 +1,116 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import org.apache.airavata.agents.api.AdaptorParams;
+
+import java.io.*;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SshAdaptorParams extends AdaptorParams implements Serializable {
+
+    private int port = 22;
+    private String hostName;
+    private String userName;
+
+    private String password;
+
+    private byte[] publicKey;
+    private byte[] privateKey;
+    private String passphrase;
+
+    private String knownHostsFilePath;
+    private boolean strictHostKeyChecking;
+
+    public int getPort() {
+        return port;
+    }
+
+    public SshAdaptorParams setPort(int port) {
+        this.port = port;
+        return this;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public SshAdaptorParams setHostName(String hostName) {
+        this.hostName = hostName;
+        return this;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public SshAdaptorParams setUserName(String userName) {
+        this.userName = userName;
+        return this;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public SshAdaptorParams setPassword(String password) {
+        this.password = password;
+        return this;
+    }
+
+    public byte[] getPublicKey() {
+        return publicKey;
+    }
+
+    public SshAdaptorParams setPublicKey(byte[] publicKey) {
+        this.publicKey = publicKey;
+        return this;
+    }
+
+    public byte[] getPrivateKey() {
+        return privateKey;
+    }
+
+    public SshAdaptorParams setPrivateKey(byte[] privateKey) {
+        this.privateKey = privateKey;
+        return this;
+    }
+
+    public String getPassphrase() {
+        return passphrase;
+    }
+
+    public SshAdaptorParams setPassphrase(String passphrase) {
+        this.passphrase = passphrase;
+        return this;
+    }
+
+    public String getKnownHostsFilePath() {
+        return knownHostsFilePath;
+    }
+
+    public SshAdaptorParams setKnownHostsFilePath(String knownHostsFilePath) {
+        this.knownHostsFilePath = knownHostsFilePath;
+        return this;
+    }
+
+    public boolean isStrictHostKeyChecking() {
+        return strictHostKeyChecking;
+    }
+
+    public SshAdaptorParams setStrictHostKeyChecking(boolean strictHostKeyChecking) {
+        this.strictHostKeyChecking = strictHostKeyChecking;
+        return this;
+    }
+
+    public static void main(String args[]) throws IOException {
+        SshAdaptorParams params = new SshAdaptorParams();
+        params.setUserName("dimuthu");
+        params.setPassword("upe");
+        params.setHostName("localhost");
+        params.writeToFile(new File("/tmp/ssh-param.json"));
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
new file mode 100644
index 0000000..19b429c
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -0,0 +1,430 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import com.jcraft.jsch.*;
+import org.apache.airavata.agents.api.*;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SshAgentAdaptor implements AgentAdaptor {
+
+    private Session session = null;
+    private AppCatalog appCatalog;
+    private ComputeResourceDescription computeResourceDescription;
+    private ResourceJobManager resourceJobManager;
+    private SSHJobSubmission sshJobSubmission;
+
+    public void init(AdaptorParams adaptorParams) throws AgentException {
+
+        if (adaptorParams instanceof SshAdaptorParams) {
+            SshAdaptorParams params = SshAdaptorParams.class.cast(adaptorParams);
+            JSch jSch = new JSch();
+            try {
+
+                if (params.getPassword() != null) {
+                    this.session = jSch.getSession(params.getUserName(), params.getHostName(), params.getPort());
+                    session.setPassword(params.getPassword());
+                    session.setUserInfo(new SftpUserInfo(params.getPassword()));
+                } else {
+                    jSch.addIdentity(UUID.randomUUID().toString(), params.getPrivateKey(), params.getPublicKey(),
+                            params.getPassphrase().getBytes());
+                    this.session = jSch.getSession(params.getUserName(), params.getHostName(),
+                            params.getPort());
+                    session.setUserInfo(new DefaultUserInfo(params.getUserName(), null, params.getPassphrase()));
+                }
+
+                if (params.isStrictHostKeyChecking()) {
+                    jSch.setKnownHosts(params.getKnownHostsFilePath());
+                } else {
+                    session.setConfig("StrictHostKeyChecking", "no");
+                }
+                session.connect(); // 0 connection timeout
+
+            } catch (JSchException e) {
+                throw new AgentException("Could not create ssh session for host " + params.getHostName(), e);
+            }
+        } else {
+            throw new AgentException("Unknown parameter type to ssh initialize agent adaptor. Required SshAdaptorParams type");
+        }
+
+    }
+
+    @Override
+    public void init(String computeResourceId, String gatewayId, String userId, String token) throws AgentException {
+        try {
+            this.appCatalog = RegistryFactory.getAppCatalog();
+            this.computeResourceDescription = this.appCatalog.getComputeResource().getComputeResource(computeResourceId);
+            List<JobSubmissionInterface> jobSubmissionInterfaces = this.computeResourceDescription.getJobSubmissionInterfaces();
+            Optional<JobSubmissionInterface> jobSubmissionInterfaceOp = jobSubmissionInterfaces.stream()
+                    .filter(iface -> JobSubmissionProtocol.SSH == iface.getJobSubmissionProtocol() ||
+                            JobSubmissionProtocol.SSH_FORK == iface.getJobSubmissionProtocol())
+                    .findFirst();
+
+            JobSubmissionInterface jobSubmissionInterface = jobSubmissionInterfaceOp.orElseThrow(() -> new AgentException("Could not find a Job submission interface with SSH"));
+
+            this.sshJobSubmission = this.appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+            this.resourceJobManager = sshJobSubmission.getResourceJobManager();
+
+            String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+            String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+            String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+            String driver = ServerSettings.getCredentialStoreDBDriver();
+            CredentialReaderImpl credentialReader = new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass, driver));
+            Credential credential = credentialReader.getCredential(gatewayId, token);
+
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = SSHCredential.class.cast(credential);
+                SshAdaptorParams adaptorParams = new SshAdaptorParams();
+                adaptorParams.setHostName(this.computeResourceDescription.getHostName());
+                adaptorParams.setUserName(userId);
+                adaptorParams.setPassphrase(sshCredential.getPassphrase());
+                adaptorParams.setPrivateKey(sshCredential.getPrivateKey());
+                adaptorParams.setPublicKey(sshCredential.getPublicKey());
+                adaptorParams.setStrictHostKeyChecking(false);
+                init(adaptorParams);
+            }
+
+        } catch (AppCatalogException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (IllegalAccessException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (InstantiationException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (CredentialStoreException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        }
+    }
+
+    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+        StandardOutReader commandOutput = new StandardOutReader();
+        try {
+            ChannelExec channelExec = ((ChannelExec) session.openChannel("exec"));
+            channelExec.setCommand(command);
+            channelExec.setInputStream(null);
+            channelExec.setErrStream(commandOutput.getStandardError());
+            channelExec.connect();
+            commandOutput.onOutput(channelExec);
+            return commandOutput;
+        } catch (JSchException e) {
+            throw new AgentException(e);
+        }
+    }
+
+    public void createDirectory(String path) throws AgentException {
+        try {
+            String command = "mkdir -p " + path;
+            Channel channel = session.openChannel("exec");
+            StandardOutReader stdOutReader = new StandardOutReader();
+
+            ((ChannelExec) channel).setCommand(command);
+
+            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+            try {
+                channel.connect();
+            } catch (JSchException e) {
+
+                channel.disconnect();
+                System.out.println("Unable to retrieve command output. Command - " + command +
+                        " on server - " + session.getHost() + ":" + session.getPort() +
+                        " connecting user name - "
+                        + session.getUserName());
+                throw new AgentException(e);
+            }
+            stdOutReader.onOutput(channel);
+            if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+                throw new AgentException(stdOutReader.getStdErrorString());
+            }
+
+            channel.disconnect();
+        } catch (JSchException e) {
+            throw new AgentException(e);
+        }
+    }
+
+    public void copyFile(String localFile, String remoteFile) throws AgentException {
+        FileInputStream fis = null;
+        String prefix = null;
+        if (new File(localFile).isDirectory()) {
+            prefix = localFile + File.separator;
+        }
+        boolean ptimestamp = true;
+
+        try {
+            // exec 'scp -t rfile' remotely
+            String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+            Channel channel = session.openChannel("exec");
+
+            StandardOutReader stdOutReader = new StandardOutReader();
+            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+            ((ChannelExec) channel).setCommand(command);
+
+            // get I/O streams for remote scp
+            OutputStream out = channel.getOutputStream();
+            InputStream in = channel.getInputStream();
+
+            channel.connect();
+
+            if (checkAck(in) != 0) {
+                String error = "Error Reading input Stream";
+                //log.error(error);
+                throw new AgentException(error);
+            }
+
+            File _lfile = new File(localFile);
+
+            if (ptimestamp) {
+                command = "T" + (_lfile.lastModified() / 1000) + " 0";
+                // The access time should be sent here,
+                // but it is not accessible with JavaAPI ;-<
+                command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+                out.write(command.getBytes());
+                out.flush();
+                if (checkAck(in) != 0) {
+                    String error = "Error Reading input Stream";
+                    throw new AgentException(error);
+                }
+            }
+
+            // send "C0644 filesize filename", where filename should not include '/'
+            long filesize = _lfile.length();
+            command = "C0644 " + filesize + " ";
+            if (localFile.lastIndexOf('/') > 0) {
+                command += localFile.substring(localFile.lastIndexOf('/') + 1);
+            } else {
+                command += localFile;
+            }
+            command += "\n";
+            out.write(command.getBytes());
+            out.flush();
+            if (checkAck(in) != 0) {
+                String error = "Error Reading input Stream";
+                //log.error(error);
+                throw new AgentException(error);
+            }
+
+            // send a content of localFile
+            fis = new FileInputStream(localFile);
+            byte[] buf = new byte[1024];
+            while (true) {
+                int len = fis.read(buf, 0, buf.length);
+                if (len <= 0) break;
+                out.write(buf, 0, len); //out.flush();
+            }
+            fis.close();
+            fis = null;
+            // send '\0'
+            buf[0] = 0;
+            out.write(buf, 0, 1);
+            out.flush();
+            if (checkAck(in) != 0) {
+                String error = "Error Reading input Stream";
+                //log.error(error);
+                throw new AgentException(error);
+            }
+            out.close();
+            stdOutReader.onOutput(channel);
+
+
+            channel.disconnect();
+            if (stdOutReader.getStdErrorString().contains("scp:")) {
+                throw new AgentException(stdOutReader.getStdErrorString());
+            }
+            //since remote file is always a file  we just return the file
+            //return remoteFile;
+        } catch (JSchException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public List<String> listDirectory(String path) throws AgentException {
+
+        try {
+            String command = "ls " + path;
+            Channel channel = session.openChannel("exec");
+            StandardOutReader stdOutReader = new StandardOutReader();
+
+            ((ChannelExec) channel).setCommand(command);
+
+
+            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+            try {
+                channel.connect();
+            } catch (JSchException e) {
+
+                channel.disconnect();
+//            session.disconnect();
+
+                throw new AgentException("Unable to retrieve command output. Command - " + command +
+                        " on server - " + session.getHost() + ":" + session.getPort() +
+                        " connecting user name - "
+                        + session.getUserName(), e);
+            }
+            stdOutReader.onOutput(channel);
+            stdOutReader.getStdOutputString();
+            if (stdOutReader.getStdErrorString().contains("ls:")) {
+                throw new AgentException(stdOutReader.getStdErrorString());
+            }
+            channel.disconnect();
+            return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+
+        } catch (JSchException e) {
+            throw new AgentException(e);
+        }
+    }
+
+    private static class DefaultUserInfo implements UserInfo, UIKeyboardInteractive {
+
+        private String userName;
+        private String password;
+        private String passphrase;
+
+        public DefaultUserInfo(String userName, String password, String passphrase) {
+            this.userName = userName;
+            this.password = password;
+            this.passphrase = passphrase;
+        }
+
+        @Override
+        public String getPassphrase() {
+            return passphrase;
+        }
+
+        @Override
+        public String getPassword() {
+            return password;
+        }
+
+        @Override
+        public boolean promptPassword(String s) {
+            return true;
+        }
+
+        @Override
+        public boolean promptPassphrase(String s) {
+            return false;
+        }
+
+        @Override
+        public boolean promptYesNo(String s) {
+            return false;
+        }
+
+        @Override
+        public void showMessage(String s) {
+
+        }
+
+        @Override
+        public String[] promptKeyboardInteractive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
+            return new String[0];
+        }
+    }
+
+    class SftpUserInfo implements UserInfo {
+
+        String password = null;
+
+        public SftpUserInfo(String password) {
+            this.password = password;
+        }
+
+        @Override
+        public String getPassphrase() {
+            return null;
+        }
+
+        @Override
+        public String getPassword() {
+            return password;
+        }
+
+        public void setPassword(String passwd) {
+            password = passwd;
+        }
+
+        @Override
+        public boolean promptPassphrase(String message) {
+            return false;
+        }
+
+        @Override
+        public boolean promptPassword(String message) {
+            return false;
+        }
+
+        @Override
+        public boolean promptYesNo(String message) {
+            return true;
+        }
+
+        @Override
+        public void showMessage(String message) {
+        }
+    }
+
+    static int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            //FIXME: Redundant
+            if (b == 1) { // error
+                System.out.print(sb.toString());
+            }
+            if (b == 2) { // fatal error
+                System.out.print(sb.toString());
+            }
+            //log.warn(sb.toString());
+        }
+        return b;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
new file mode 100644
index 0000000..49c036e
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
@@ -0,0 +1,83 @@
+package org.apache.airavata.helix.agent.ssh;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.agents.api.CommandOutput;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class StandardOutReader implements CommandOutput {
+
+    // Todo improve this. We need to direct access of std out and exit code
+
+    String stdOutputString = null;
+    ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+    private int exitCode;
+
+    public void onOutput(Channel channel) {
+        try {
+            StringBuffer pbsOutput = new StringBuffer("");
+            InputStream inputStream =  channel.getInputStream();
+            byte[] tmp = new byte[1024];
+            do {
+                while (inputStream.available() > 0) {
+                    int i = inputStream.read(tmp, 0, 1024);
+                    if (i < 0) break;
+                    pbsOutput.append(new String(tmp, 0, i));
+                }
+            } while (!channel.isClosed()) ;
+            String output = pbsOutput.toString();
+            this.setStdOutputString(output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void exitCode(int code) {
+        System.out.println("Program exit code - " + code);
+        this.exitCode = code;
+    }
+
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    public String getStdOutputString() {
+        return stdOutputString;
+    }
+
+    public void setStdOutputString(String stdOutputString) {
+        this.stdOutputString = stdOutputString;
+    }
+
+    public String getStdErrorString() {
+        return errorStream.toString();
+    }
+
+    public OutputStream getStandardError() {
+        return errorStream;
+    }
+
+    @Override
+    public String getStdOut() {
+        return null;
+    }
+
+    @Override
+    public String getStdError() {
+        return null;
+    }
+
+    @Override
+    public String getExitCommand() {
+        return null;
+    }
+}
diff --git a/modules/airavata-helix/pom.xml b/modules/airavata-helix/pom.xml
new file mode 100644
index 0000000..05938fd
--- /dev/null
+++ b/modules/airavata-helix/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <packaging>pom</packaging>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>airavata-helix</artifactId>
+
+    <modules>
+        <module>agent-api</module>
+        <module>agent-impl</module>
+        <module>task-api</module>
+        <module>task-core</module>
+        <module>workflow-impl</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/pom.xml b/modules/airavata-helix/task-api/pom.xml
new file mode 100644
index 0000000..41ec00c
--- /dev/null
+++ b/modules/airavata-helix/task-api/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>task-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>0.6.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <!--<build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>-->
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
new file mode 100644
index 0000000..07de06e
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.helix.task.api;
+
+import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface TaskHelper {
+    public AdaptorSupport getAdaptorSupport();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java
new file mode 100644
index 0000000..3e4b7f1
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskDef.java
@@ -0,0 +1,18 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskDef {
+    public String name();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java
new file mode 100644
index 0000000..a22c387
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskOutPort.java
@@ -0,0 +1,18 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskOutPort {
+    public String name();
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java
new file mode 100644
index 0000000..198b172
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/annotation/TaskParam.java
@@ -0,0 +1,20 @@
+package org.apache.airavata.helix.task.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface TaskParam {
+    public String name();
+    public String defaultValue() default "";
+    public boolean mandatory() default false;
+}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
new file mode 100644
index 0000000..3e24aaa
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
@@ -0,0 +1,52 @@
+package org.apache.airavata.helix.task.api.support;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+
+import java.io.File;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface AdaptorSupport {
+    public void initializeAdaptor();
+
+    public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws Exception;
+
+
+    /**
+     *
+     * @param command
+     * @param workingDirectory
+     * @param computeResourceId
+     * @param protocol
+     * @param authToken
+     * @throws Exception
+     */
+    public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws Exception;
+
+    /**
+     *
+     * @param path
+     * @param computeResourceId
+     * @param protocol
+     * @param authToken
+     * @throws Exception
+     */
+    public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws Exception;
+
+    /**
+     *
+     * @param sourceFile
+     * @param destinationFile
+     * @param computeResourceId
+     * @param protocol
+     * @param authToken
+     * @throws Exception
+     */
+    public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws Exception;
+}
diff --git a/modules/airavata-helix/task-api/src/main/resources/application.properties b/modules/airavata-helix/task-api/src/main/resources/application.properties
new file mode 100644
index 0000000..733515f
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+helix.controller.name=controller-1
\ No newline at end of file
diff --git a/modules/airavata-helix/task-api/src/main/resources/log4j.properties b/modules/airavata-helix/task-api/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/modules/airavata-helix/task-api/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
new file mode 100644
index 0000000..df72dac
--- /dev/null
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>task-core</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>0.6.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>task-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <!--<build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
new file mode 100644
index 0000000..04fa37f
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -0,0 +1,108 @@
+package org.apache.airavata.helix.core;
+
+import org.apache.airavata.helix.core.util.TaskUtil;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AbstractTask extends UserContentStore implements Task {
+
+    private static final String NEXT_JOB = "next-job";
+    private static final String WORKFLOW_STARTED = "workflow-started";
+
+    @TaskParam(name = "taskId")
+    private String taskId;
+
+    private TaskCallbackContext callbackContext;
+    private TaskHelper taskHelper;
+
+    @Override
+    public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+        super.init(manager, workflowName, jobName, taskName);
+        try {
+            TaskUtil.deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
+        } catch (IllegalAccessException | InstantiationException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public final TaskResult run() {
+        boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
+                this.callbackContext.getJobConfig().getJobId()
+                        .equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
+        if (isThisNextJob) {
+            return onRun(this.taskHelper);
+        } else {
+            return new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
+        }
+    }
+
+    @Override
+    public final void cancel() {
+        onCancel();
+    }
+
+    public abstract TaskResult onRun(TaskHelper helper);
+
+    public abstract void onCancel();
+
+    protected void publishErrors(Throwable e) {
+        // TODO Publish through kafka channel with task and workflow id
+        e.printStackTrace();
+    }
+
+    public void sendNextJob(String jobId) {
+        putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
+        if (jobId != null) {
+            putUserContent(NEXT_JOB, jobId, Scope.WORKFLOW);
+        }
+    }
+
+    protected void setContextVariable(String key, String value) {
+        putUserContent(key, value, Scope.WORKFLOW);
+    }
+
+    protected String getContextVariable(String key) {
+        return getUserContent(key, Scope.WORKFLOW);
+    }
+
+    // Getters and setters
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public AbstractTask setTaskId(String taskId) {
+        this.taskId = taskId;
+        return this;
+    }
+
+    public TaskCallbackContext getCallbackContext() {
+        return callbackContext;
+    }
+
+    public AbstractTask setCallbackContext(TaskCallbackContext callbackContext) {
+        this.callbackContext = callbackContext;
+        return this;
+    }
+
+    public TaskHelper getTaskHelper() {
+        return taskHelper;
+    }
+
+    public AbstractTask setTaskHelper(TaskHelper taskHelper) {
+        this.taskHelper = taskHelper;
+        return this;
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java
new file mode 100644
index 0000000..99dc37c
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/OutPort.java
@@ -0,0 +1,44 @@
+package org.apache.airavata.helix.core;
+
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OutPort {
+
+    private String nextJobId;
+    private AbstractTask task;
+
+    public OutPort(String nextJobId, AbstractTask task) {
+        this.nextJobId = nextJobId;
+        this.task = task;
+    }
+
+    public TaskResult invoke(TaskResult taskResult) {
+        task.sendNextJob(nextJobId);
+        return taskResult;
+    }
+
+    public String getNextJobId() {
+        return nextJobId;
+    }
+
+    public OutPort setNextJobId(String nextJobId) {
+        this.nextJobId = nextJobId;
+        return this;
+    }
+
+    public AbstractTask getTask() {
+        return task;
+    }
+
+    public OutPort setTask(AbstractTask task) {
+        this.task = task;
+        return this;
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java
new file mode 100644
index 0000000..cdc27f7
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/controller/HelixController.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.helix.core.controller;
+
+import org.apache.airavata.helix.core.util.PropertyResolver;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixController implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(HelixController.class);
+
+    private String clusterName;
+    private String controllerName;
+    private String zkAddress;
+    private org.apache.helix.HelixManager zkHelixManager;
+
+    private CountDownLatch startLatch = new CountDownLatch(1);
+    private CountDownLatch stopLatch = new CountDownLatch(1);
+
+    public HelixController(String propertyFile) throws IOException {
+
+        PropertyResolver propertyResolver = new PropertyResolver();
+        propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
+
+        this.clusterName = propertyResolver.get("helix.cluster.name");
+        this.controllerName = propertyResolver.get("helix.controller.name");
+        this.zkAddress = propertyResolver.get("zookeeper.connection.url");
+    }
+
+    public void run() {
+        try {
+            zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+                    controllerName, HelixControllerMain.STANDALONE);
+            startLatch.countDown();
+            stopLatch.await();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    public void start() {
+        new Thread(this).start();
+        try {
+            startLatch.await();
+            logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            disconnect();
+                        }
+                    }
+            );
+
+        } catch (InterruptedException ex) {
+            logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+        }
+    }
+
+    public void stop() {
+        stopLatch.countDown();
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+
+    public static void main(String args[]) {
+        try {
+            HelixController helixController = new HelixController("application.properties");
+            helixController.start();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
new file mode 100644
index 0000000..190b866
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -0,0 +1,171 @@
+package org.apache.airavata.helix.core.participant;
+
+import org.apache.airavata.helix.core.support.TaskHelperImpl;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.util.PropertyResolver;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class HelixParticipant <T extends AbstractTask> implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(HelixParticipant.class);
+
+    private String zkAddress;
+    private String clusterName;
+    private String participantName;
+    private ZKHelixManager zkHelixManager;
+    private String taskTypeName;
+    private PropertyResolver propertyResolver;
+    private Class<T> taskClass;
+
+    public HelixParticipant(String propertyFile, Class<T> taskClass, String taskTypeName) throws IOException {
+
+        logger.info("Initializing Participant Node");
+
+        this.propertyResolver = new PropertyResolver();
+        propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
+
+        this.zkAddress = propertyResolver.get("zookeeper.connection.url");
+        this.clusterName = propertyResolver.get("helix.cluster.name");
+        this.participantName = propertyResolver.get("participant.name");
+        this.taskTypeName = taskTypeName;
+        this.taskClass = taskClass;
+
+        logger.info("Zookeper connection url " + zkAddress);
+        logger.info("Cluster name " + clusterName);
+        logger.info("Participant name " + participantName);
+        logger.info("Task type " + taskTypeName);
+        if (taskClass != null) {
+            logger.info("Task class " + taskClass.getCanonicalName());
+        }
+    }
+
+    public Map<String, TaskFactory> getTaskFactory() {
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+        TaskFactory taskFac = new TaskFactory() {
+            public Task createNewTask(TaskCallbackContext context) {
+                try {
+                    return taskClass.newInstance()
+                            .setCallbackContext(context)
+                            .setTaskHelper(new TaskHelperImpl());
+                } catch (InstantiationException | IllegalAccessException e) {
+                    e.printStackTrace();
+                    return null;
+                }
+            }
+        };
+
+        TaskDef taskDef = taskClass.getAnnotation(TaskDef.class);
+        taskRegistry.put(taskDef.name(), taskFac);
+
+        return taskRegistry;
+    }
+
+    public void run() {
+        ZkClient zkClient = null;
+        try {
+            zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+            List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+
+            if (!nodesInCluster.contains(participantName)) {
+                InstanceConfig instanceConfig = new InstanceConfig(participantName);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setInstanceEnabled(true);
+                if (taskTypeName != null) {
+                    instanceConfig.addTag(taskTypeName);
+                }
+                zkHelixAdmin.addInstance(clusterName, instanceConfig);
+                logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+            } else {
+                if (taskTypeName != null) {
+                    zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName);
+                }
+            }
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            logger.debug("Participant: " + participantName + ", shutdown hook called.");
+                            disconnect();
+                        }
+                    }
+            );
+
+            // connect the participant manager
+            connect();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+    }
+
+    private void connect() {
+        try {
+            zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+            // register online-offline model
+            StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+            OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+            machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+            // register task model
+            machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory()));
+            logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+            zkHelixManager.connect();
+            logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+            Thread.currentThread().join();
+        } catch (InterruptedException ex) {
+            logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+        }
+        catch (Exception ex) {
+            logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+
+    public PropertyResolver getPropertyResolver() {
+        return propertyResolver;
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
new file mode 100644
index 0000000..87a1e17
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.helix.core.support;
+
+import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+
+import java.io.File;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class AdaptorSupportImpl implements AdaptorSupport {
+
+    private static AdaptorSupportImpl INSTANCE;
+
+    private final AgentStore agentStore = new AgentStore();
+
+    private AdaptorSupportImpl() {}
+
+    public synchronized static AdaptorSupportImpl getInstance() {
+        if (INSTANCE == null) {
+            INSTANCE = new AdaptorSupportImpl();
+        }
+        return INSTANCE;
+    }
+
+    public void initializeAdaptor() {
+    }
+
+    public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws AgentException {
+        return fetchAdaptor(computeResourceId, protocol, authToken).executeCommand(command, workingDirectory);
+    }
+
+    public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws AgentException {
+        fetchAdaptor(computeResourceId, protocol, authToken).createDirectory(path);
+    }
+
+    public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws AgentException {
+        fetchAdaptor(computeResourceId, protocol, authToken).copyFile(sourceFile, destinationFile);
+    }
+
+    public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws AgentException {
+         return agentStore.fetchAdaptor(computeResource, protocol, authToken);
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
new file mode 100644
index 0000000..77fc5ce
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
@@ -0,0 +1,16 @@
+package org.apache.airavata.helix.core.support;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskHelperImpl implements TaskHelper {
+
+    public AdaptorSupportImpl getAdaptorSupport() {
+        return AdaptorSupportImpl.getInstance();
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java
new file mode 100644
index 0000000..4532345
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/PropertyResolver.java
@@ -0,0 +1,44 @@
+package org.apache.airavata.helix.core.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class PropertyResolver {
+    private Properties properties = new Properties();
+
+    public void loadFromFile(File propertyFile) throws IOException {
+        properties = new Properties();
+        properties.load(new FileInputStream(propertyFile));
+    }
+
+    public void loadInputStream(InputStream inputStream) throws IOException {
+        properties = new Properties();
+        properties.load(inputStream);
+    }
+
+    public String get(String key) {
+        if (properties.containsKey(key)) {
+            if (System.getenv(key.replace(".", "_")) != null) {
+                return System.getenv(key.replace(".", "_"));
+            } else {
+                return properties.getProperty(key);
+            }
+        } else {
+            return null;
+        }
+    }
+
+    public String get(String key, String defaultValue) {
+        return Optional.ofNullable(get(key)).orElse(defaultValue);
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
new file mode 100644
index 0000000..d0f1ab6
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -0,0 +1,103 @@
+package org.apache.airavata.helix.core.util;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class TaskUtil {
+
+    public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T task) throws IllegalAccessException {
+        Field[] fields = task.getClass().getDeclaredFields();
+        List<OutPort> outPorts = new ArrayList<>();
+        for (Field field : fields) {
+            TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+            if (outPortAnnotation != null) {
+                field.setAccessible(true);
+                OutPort outPort = (OutPort) field.get(task);
+                outPorts.add(outPort);
+            }
+        }
+        return outPorts;
+    }
+
+    public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException {
+
+        Map<String, String> result = new HashMap<>();
+        for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field classField : fields) {
+                TaskParam parm = classField.getAnnotation(TaskParam.class);
+                if (parm != null) {
+                    classField.setAccessible(true);
+                    result.put(parm.name(), classField.get(data).toString());
+                }
+
+                TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+                if (outPort != null) {
+                    classField.setAccessible(true);
+                    if (classField.get(data) != null) {
+                        result.put(outPort.name(), ((OutPort) classField.get(data)).getNextJobId().toString());
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    public static <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, InstantiationException {
+
+        List<Field> allFields = new ArrayList<>();
+        Class genericClass = instance.getClass();
+
+        while (AbstractTask.class.isAssignableFrom(genericClass)) {
+            Field[] declaredFields = genericClass.getDeclaredFields();
+            for (Field declaredField : declaredFields) {
+                allFields.add(declaredField);
+            }
+            genericClass = genericClass.getSuperclass();
+        }
+
+        for (Field classField : allFields) {
+            TaskParam param = classField.getAnnotation(TaskParam.class);
+            if (param != null) {
+                if (params.containsKey(param.name())) {
+                    classField.setAccessible(true);
+                    if (classField.getType().isAssignableFrom(String.class)) {
+                        classField.set(instance, params.get(param.name()));
+                    } else if (classField.getType().isAssignableFrom(Integer.class)) {
+                        classField.set(instance, Integer.parseInt(params.get(param.name())));
+                    } else if (classField.getType().isAssignableFrom(Long.class)) {
+                        classField.set(instance, Long.parseLong(params.get(param.name())));
+                    } else if (classField.getType().isAssignableFrom(Boolean.class)) {
+                        classField.set(instance, Boolean.parseBoolean(params.get(param.name())));
+                    }
+                }
+            }
+        }
+
+        for (Field classField : allFields) {
+            TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
+            if (outPort != null) {
+                classField.setAccessible(true);
+                if (params.containsKey(outPort.name())) {
+                    classField.set(instance, new OutPort(params.get(outPort.name()), instance));
+                } else {
+                    classField.set(instance, new OutPort(null, instance));
+                }
+            }
+        }
+    }
+}
diff --git a/modules/airavata-helix/workflow-impl/pom.xml b/modules/airavata-helix/workflow-impl/pom.xml
new file mode 100644
index 0000000..03f324e
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>workflow-impl</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>task-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>task-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+<!--
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+-->
+
+</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java
new file mode 100644
index 0000000..d212f91
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/SimpleWorkflow.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.helix.workflow;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class SimpleWorkflow {
+
+    public static void main(String[] args) throws Exception {
+        WorkflowManager wm = new WorkflowManager("AiravataDemoCluster", "WorkflowManager", "localhost:2199");
+
+        /*MkdirTask mkdirTask1 = new MkdirTask();
+        mkdirTask1.setDirName("/tmp/newdir");
+        mkdirTask1.setComputeResourceId("localhost");
+        mkdirTask1.setTaskId("task1");
+
+        MkdirTask mkdirTask2 = new MkdirTask();
+        mkdirTask2.setDirName("/tmp/newdir2");
+        mkdirTask2.setComputeResourceId("localhost");
+        mkdirTask2.setTaskId("task2");
+
+        CommandTask commandTask1 = new CommandTask();
+        commandTask1.setCommand("touch /tmp/newdir/a.txt");
+        commandTask1.setWorkingDirectory("/tmp");
+        commandTask1.setComputeResource("localhost");
+        commandTask1.setTaskId("task3");
+
+        mkdirTask1.setSuccessPort(new OutPort("task2", mkdirTask1));
+        mkdirTask2.setSuccessPort(new OutPort("task3", mkdirTask2));
+
+        List<AbstractTask> allTasks = new ArrayList<>();
+        allTasks.add(mkdirTask2);
+        allTasks.add(mkdirTask1);
+        allTasks.add(commandTask1);
+
+        wm.launchWorkflow(UUID.randomUUID().toString(), allTasks);*/
+    }
+}
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
new file mode 100644
index 0000000..ab7e3c4
--- /dev/null
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -0,0 +1,94 @@
+package org.apache.airavata.helix.workflow;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.core.util.*;
+import org.apache.airavata.helix.core.util.TaskUtil;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class WorkflowManager {
+
+    private static final String WORKFLOW_PREFIX = "Workflow_of_process_";
+    private TaskDriver taskDriver;
+
+    public WorkflowManager(String helixClusterName, String instanceName, String zkConnectionString) throws Exception {
+
+        HelixManager helixManager = HelixManagerFactory.getZKHelixManager(helixClusterName, instanceName,
+                InstanceType.SPECTATOR, zkConnectionString);
+        helixManager.connect();
+
+        Runtime.getRuntime().addShutdownHook(
+                new Thread() {
+                    @Override
+                    public void run() {
+                        helixManager.disconnect();
+                    }
+                }
+        );
+
+        taskDriver = new TaskDriver(helixManager);
+    }
+
+    public void launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant) throws Exception {
+
+        Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_PREFIX + processId).setExpiry(0);
+
+        for (int i = 0; i < tasks.size(); i++) {
+            AbstractTask data = tasks.get(i);
+            String taskType = data.getClass().getAnnotation(TaskDef.class).name();
+            TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + data.getTaskId())
+                    .setCommand(taskType);
+            Map<String, String> paramMap = org.apache.airavata.helix.core.util.TaskUtil.serializeTaskData(data);
+            paramMap.forEach(taskBuilder::addConfig);
+
+            List<TaskConfig> taskBuilds = new ArrayList<>();
+            taskBuilds.add(taskBuilder.build());
+
+            JobConfig.Builder job = new JobConfig.Builder()
+                    .addTaskConfigs(taskBuilds)
+                    .setFailureThreshold(0)
+                    .setMaxAttemptsPerTask(3);
+
+            if (!globalParticipant) {
+                job.setInstanceGroupTag(taskType);
+            }
+
+            workflowBuilder.addJob((data.getTaskId()), job);
+
+            List<OutPort> outPorts = TaskUtil.getOutPortsOfTask(data);
+            outPorts.forEach(outPort -> {
+                if (outPort != null) {
+                    workflowBuilder.addParentChildDependency(data.getTaskId(), outPort.getNextJobId());
+                }
+            });
+        }
+
+        WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
+        workflowBuilder.setWorkflowConfig(config.build());
+        Workflow workflow = workflowBuilder.build();
+
+        taskDriver.start(workflow);
+
+        //TODO : Do we need to monitor workflow status? If so how do we do it in a scalable manner? For example,
+        // if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility
+
+        TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(),
+                TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
+        System.out.println("Workflow finished with state " + taskState.name());
+
+    }
+}
\ No newline at end of file
diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml
new file mode 100644
index 0000000..bae2785
--- /dev/null
+++ b/modules/helix-spectator/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>helix-spectator</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>task-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>workflow-impl</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
+        <dependency>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <version>1.1.7</version>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
new file mode 100644
index 0000000..f0e166b
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -0,0 +1,68 @@
+package org.apache.airavata.helix.impl.participant;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.participant.HelixParticipant;
+import org.apache.airavata.helix.core.support.TaskHelperImpl;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GlobalParticipant extends HelixParticipant {
+
+    private String[] taskClasses = {
+        "org.apache.airavata.helix.impl.task.EnvSetupTask",
+        "org.apache.airavata.helix.impl.task.DataStagingTask",
+        "org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask",
+        "org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask",
+        "org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask"
+    };
+
+    public Map<String, TaskFactory> getTaskFactory() {
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+
+        for (String taskClass : taskClasses) {
+            TaskFactory taskFac = new TaskFactory() {
+                public Task createNewTask(TaskCallbackContext context) {
+                    try {
+                        return AbstractTask.class.cast(Class.forName(taskClass).newInstance())
+                                .setCallbackContext(context)
+                                .setTaskHelper(new TaskHelperImpl());
+                    } catch (InstantiationException | IllegalAccessException e) {
+                        e.printStackTrace();
+                        return null;
+                    } catch (ClassNotFoundException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                }
+            };
+
+            TaskDef taskDef = null;
+            try {
+                taskDef = Class.forName(taskClass).getAnnotation(TaskDef.class);
+                taskRegistry.put(taskDef.name(), taskFac);
+            } catch (ClassNotFoundException e) {
+                e.printStackTrace();
+            }
+        }
+
+
+        return taskRegistry;
+    }
+
+    public GlobalParticipant(String propertyFile, Class taskClass, String taskTypeName) throws IOException {
+        super(propertyFile, taskClass, taskTypeName);
+    }
+
+    public static void main(String args[]) throws IOException {
+        GlobalParticipant participant = new GlobalParticipant("application.properties", null, null);
+        Thread t = new Thread(participant);
+        t.start();
+    }
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
new file mode 100644
index 0000000..72d3e17
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -0,0 +1,293 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+public abstract class AiravataTask extends AbstractTask {
+
+    private static final Logger logger = LogManager.getLogger(AiravataTask.class);
+
+    private AppCatalog appCatalog;
+    private ExperimentCatalog experimentCatalog;
+    private Publisher statusPublisher;
+    private ProcessModel processModel;
+
+    private ComputeResourceDescription computeResourceDescription;
+    private ComputeResourcePreference gatewayComputeResourcePreference;
+    private UserComputeResourcePreference userComputeResourcePreference;
+    private UserResourceProfile userResourceProfile;
+    private GatewayResourceProfile gatewayResourceProfile;
+
+    @TaskParam(name = "Process Id")
+    private String processId;
+
+    @TaskParam(name = "experimentId")
+    private String experimentId;
+
+    @TaskParam(name = "gatewayId")
+    private String gatewayId;
+
+    @TaskOutPort(name = "Success Port")
+    private OutPort onSuccess;
+
+
+    protected TaskResult onSuccess(String message) {
+        String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : "";
+        logger.info(successMessage);
+        return onSuccess.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+    }
+
+    protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+        String errorMessage;
+
+        if (error == null) {
+            errorMessage = "Task " + getTaskId() + " failed due to " + reason;
+            logger.error(errorMessage);
+        } else {
+            errorMessage = "Task " + getTaskId() + " failed due to " + reason + ", " + error.getMessage();
+            logger.error(errorMessage, error);
+        }
+        return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage);
+
+    }
+
+    @Override
+    public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+        super.init(manager, workflowName, jobName, taskName);
+        try {
+            appCatalog = RegistryFactory.getAppCatalog();
+            experimentCatalog = RegistryFactory.getDefaultExpCatalog();
+            processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+
+            this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel()
+                    .getComputeResourceId());
+            this.gatewayComputeResourcePreference = getAppCatalog().getGatewayProfile()
+                    .getComputeResourcePreference(getGatewayId(), computeResourceDescription.getComputeResourceId());
+
+            this.userComputeResourcePreference = getAppCatalog().getUserResourceProfile()
+                    .getUserComputeResourcePreference(getProcessModel().getUserName(), getGatewayId(), getProcessModel()
+                            .getComputeResourceId());
+
+            this.userResourceProfile = getAppCatalog().getUserResourceProfile()
+                    .getUserResourceProfile(getProcessModel().getUserName(), getGatewayId());
+
+            this.gatewayResourceProfile = getAppCatalog().getGatewayProfile().getGatewayProfile(getGatewayId());
+
+        } catch (AppCatalogException e) {
+            e.printStackTrace();
+        } catch (RegistryException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected AppCatalog getAppCatalog() {
+        return appCatalog;
+    }
+
+    protected void publishTaskState(TaskState ts) throws RegistryException {
+
+        TaskStatus taskStatus = new TaskStatus();
+        taskStatus.setState(ts);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId());
+        TaskIdentifier identifier = new TaskIdentifier(getTaskId(),
+                getProcessId(), getExperimentId(), getGatewayId());
+        TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts,
+                identifier);
+        MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
+                (MessageType.TASK.name()), getGatewayId());
+        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+    }
+
+
+    ///////////////////
+
+    public String getComputeResourceId() {
+        if (isUseUserCRPref() &&
+                userComputeResourcePreference != null &&
+                isValid(userComputeResourcePreference.getComputeResourceId())) {
+            return userComputeResourcePreference.getComputeResourceId();
+        } else {
+            return gatewayComputeResourcePreference.getComputeResourceId();
+        }
+    }
+
+    public String getComputeResourceCredentialToken(){
+        if (isUseUserCRPref()) {
+            if (userComputeResourcePreference != null &&
+                    isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+                return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+            } else {
+                return userResourceProfile.getCredentialStoreToken();
+            }
+        } else {
+            if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+                return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+            } else {
+                return gatewayResourceProfile.getCredentialStoreToken();
+            }
+        }
+    }
+
+    public String getComputeResourceLoginUserName(){
+        if (isUseUserCRPref() &&
+                userComputeResourcePreference != null &&
+                isValid(userComputeResourcePreference.getLoginUserName())) {
+            return userComputeResourcePreference.getLoginUserName();
+        } else if (isValid(getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName())) {
+            return getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName();
+        } else {
+            return gatewayComputeResourcePreference.getLoginUserName();
+        }
+    }
+
+    public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException {
+        try {
+            JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol();
+            ComputeResourceDescription resourceDescription = getComputeResourceDescription();
+            List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+            Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+            List<JobSubmissionInterface> interfaces = new ArrayList<>();
+            if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+                for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+                    if (preferredJobSubmissionProtocol != null){
+                        if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+                            if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+                                List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+                                interfaceList.add(submissionInterface);
+                            }else {
+                                interfaces.add(submissionInterface);
+                                orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+                            }
+                        }
+                    }else {
+                        Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+                            @Override
+                            public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                                return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                            }
+                        });
+                    }
+                }
+                interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+                Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+                    @Override
+                    public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                        return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                    }
+                });
+            } else {
+                throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+            }
+            return interfaces.get(0);
+        } catch (AppCatalogException e) {
+            throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+        }
+    }
+
+    //////////////////////////
+
+
+    protected boolean isValid(String str) {
+        return str != null && !str.trim().isEmpty();
+    }
+
+    public boolean isUseUserCRPref() {
+        return getProcessModel().isUseUserCRPref();
+    }
+
+    public JobSubmissionProtocol getJobSubmissionProtocol() {
+        return getGatewayComputeResourcePreference().getPreferredJobSubmissionProtocol();
+    }
+
+    public ComputeResourcePreference getGatewayComputeResourcePreference() {
+        return gatewayComputeResourcePreference;
+    }
+
+
+    public ComputeResourceDescription getComputeResourceDescription() {
+        return computeResourceDescription;
+    }
+
+    ////////////////////////
+
+    
+    public void setAppCatalog(AppCatalog appCatalog) {
+        this.appCatalog = appCatalog;
+    }
+
+    public ExperimentCatalog getExperimentCatalog() {
+        return experimentCatalog;
+    }
+
+    public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+        this.experimentCatalog = experimentCatalog;
+    }
+
+    public Publisher getStatusPublisher() {
+        return statusPublisher;
+    }
+
+    public void setStatusPublisher(Publisher statusPublisher) {
+        this.statusPublisher = statusPublisher;
+    }
+
+    public String getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(String processId) {
+        this.processId = processId;
+    }
+
+    public String getExperimentId() {
+        return experimentId;
+    }
+
+    public void setExperimentId(String experimentId) {
+        this.experimentId = experimentId;
+    }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
+
+    public void setGatewayId(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public ProcessModel getProcessModel() {
+        return processModel;
+    }
+
+    public void setProcessModel(ProcessModel processModel) {
+        this.processModel = processModel;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
new file mode 100644
index 0000000..346aa73
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
@@ -0,0 +1,19 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Data Staging Task")
+public class DataStagingTask extends AiravataTask {
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+        return null;
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
new file mode 100644
index 0000000..1cab0e2
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -0,0 +1,64 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "Environment Setup Task")
+public class EnvSetupTask extends AiravataTask {
+
+    @TaskParam(name = "Working Directory")
+    private String workingDirectory;
+
+    @TaskOutPort(name = "Success Out Port")
+    private OutPort successPort;
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+        try {
+            publishTaskState(TaskState.EXECUTING);
+            AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+                    getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+            adaptor.createDirectory(workingDirectory);
+            publishTaskState(TaskState.COMPLETED);
+            return successPort.invoke(new TaskResult(TaskResult.Status.COMPLETED, "Successfully completed"));
+        } catch (Exception e) {
+            try {
+                publishTaskState(TaskState.FAILED);
+            } catch (RegistryException e1) {
+                publishErrors(e1);
+                // ignore silently
+            }
+            publishErrors(e);
+            return new TaskResult(TaskResult.Status.FAILED, "Failed the task");
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+
+    public String getWorkingDirectory() {
+        return workingDirectory;
+    }
+
+    public void setWorkingDirectory(String workingDirectory) {
+        this.workingDirectory = workingDirectory;
+    }
+
+    public OutPort getSuccessPort() {
+        return successPort;
+    }
+
+    public void setSuccessPort(OutPort successPort) {
+        this.successPort = successPort;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
new file mode 100644
index 0000000..ec75fb7
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
@@ -0,0 +1,415 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GroovyMapData {
+
+    @ScriptTag(name = "inputDir")
+    private String inputDir;
+
+    @ScriptTag(name = "outputDir")
+    private String outputDir;
+
+    @ScriptTag(name = "executablePath")
+    private String executablePath;
+
+    @ScriptTag(name = "standardOutFile")
+    private String stdoutFile;
+
+    @ScriptTag(name = "standardErrorFile")
+    private String stderrFile;
+
+    @ScriptTag(name = "scratchLocation")
+    private String scratchLocation;
+
+    @ScriptTag(name = "gatewayId")
+    private String gatewayId;
+
+    @ScriptTag(name = "gatewayUserName")
+    private String gatewayUserName;
+
+    @ScriptTag(name = "applicationName")
+    private String applicationName;
+
+    @ScriptTag(name = "queueSpecificMacros")
+    private String queueSpecificMacros;
+
+    @ScriptTag(name = "accountString")
+    private String accountString;
+
+    @ScriptTag(name = "reservation")
+    private String reservation;
+
+    @ScriptTag(name = "jobName")
+    private String jobName;
+
+    @ScriptTag(name = "workingDirectory")
+    private String workingDirectory;
+
+    @ScriptTag(name = "inputs")
+    private List<String> inputs;
+
+    @ScriptTag(name = "inputsAll")
+    private List<String> inputsAll;
+
+    @ScriptTag(name = "userName")
+    private String userName;
+
+    @ScriptTag(name = "shellName")
+    private String shellName;
+
+    @ScriptTag(name = "maxWallTime")
+    private String maxWallTime;
+
+    @ScriptTag(name = "qualityOfService")
+    private String qualityOfService;
+
+    @ScriptTag(name = "queueName")
+    private String queueName;
+
+    @ScriptTag(name = "nodes")
+    private Integer nodes;
+
+    @ScriptTag(name = "processPerNode")
+    private Integer processPerNode;
+
+    @ScriptTag(name = "cpuCount")
+    private Integer cpuCount;
+
+    @ScriptTag(name = "usedMem")
+    private Integer usedMem;
+
+    @ScriptTag(name = "mailAddress")
+    private String mailAddress;
+
+    @ScriptTag(name = "exports")
+    private List<String> exports;
+
+    @ScriptTag(name = "moduleCommands")
+    private List<String> moduleCommands;
+
+    @ScriptTag(name = "preJobCommands")
+    private List<String> preJobCommands;
+
+    @ScriptTag(name = "postJobCommands")
+    private List<String> postJobCommands;
+
+    @ScriptTag(name = "jobSubmitterCommand")
+    private String jobSubmitterCommand;
+
+    @ScriptTag(name = "chassisName")
+    private String chassisName;
+
+
+    public Map<String, Object> getMap() {
+
+        Map<String, Object> map = new HashMap<>();
+        Field[] fields = this.getClass().getDeclaredFields();
+
+        for (Field field : fields) {
+            ScriptTag scriptTag = field.getAnnotation(ScriptTag.class);
+            if (scriptTag != null) {
+                field.setAccessible(true);
+                try {
+                    map.put(scriptTag.name(), field.get(this));
+                } catch (IllegalAccessException e) {
+                    e.printStackTrace();
+                    // ignore silently
+                }
+            }
+        }
+
+        return map;
+    }
+
+    public String getInputDir() {
+        return inputDir;
+    }
+
+    public GroovyMapData setInputDir(String inputDir) {
+        this.inputDir = inputDir;
+        return this;
+    }
+
+    public String getOutputDir() {
+        return outputDir;
+    }
+
+    public GroovyMapData setOutputDir(String outputDir) {
+        this.outputDir = outputDir;
+        return this;
+    }
+
+    public String getExecutablePath() {
+        return executablePath;
+    }
+
+    public GroovyMapData setExecutablePath(String executablePath) {
+        this.executablePath = executablePath;
+        return this;
+    }
+
+    public String getStdoutFile() {
+        return stdoutFile;
+    }
+
+    public GroovyMapData setStdoutFile(String stdoutFile) {
+        this.stdoutFile = stdoutFile;
+        return this;
+    }
+
+    public String getStderrFile() {
+        return stderrFile;
+    }
+
+    public GroovyMapData setStderrFile(String stderrFile) {
+        this.stderrFile = stderrFile;
+        return this;
+    }
+
+    public String getScratchLocation() {
+        return scratchLocation;
+    }
+
+    public GroovyMapData setScratchLocation(String scratchLocation) {
+        this.scratchLocation = scratchLocation;
+        return this;
+    }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
+
+    public GroovyMapData setGatewayId(String gatewayId) {
+        this.gatewayId = gatewayId;
+        return this;
+    }
+
+    public String getGatewayUserName() {
+        return gatewayUserName;
+    }
+
+    public GroovyMapData setGatewayUserName(String gatewayUserName) {
+        this.gatewayUserName = gatewayUserName;
+        return this;
+    }
+
+    public String getApplicationName() {
+        return applicationName;
+    }
+
+    public GroovyMapData setApplicationName(String applicationName) {
+        this.applicationName = applicationName;
+        return this;
+    }
+
+    public String getQueueSpecificMacros() {
+        return queueSpecificMacros;
+    }
+
+    public GroovyMapData setQueueSpecificMacros(String queueSpecificMacros) {
+        this.queueSpecificMacros = queueSpecificMacros;
+        return this;
+    }
+
+    public String getAccountString() {
+        return accountString;
+    }
+
+    public GroovyMapData setAccountString(String accountString) {
+        this.accountString = accountString;
+        return this;
+    }
+
+    public String getReservation() {
+        return reservation;
+    }
+
+    public GroovyMapData setReservation(String reservation) {
+        this.reservation = reservation;
+        return this;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public GroovyMapData setJobName(String jobName) {
+        this.jobName = jobName;
+        return this;
+    }
+
+    public String getWorkingDirectory() {
+        return workingDirectory;
+    }
+
+    public GroovyMapData setWorkingDirectory(String workingDirectory) {
+        this.workingDirectory = workingDirectory;
+        return this;
+    }
+
+    public List<String> getInputs() {
+        return inputs;
+    }
+
+    public GroovyMapData setInputs(List<String> inputs) {
+        this.inputs = inputs;
+        return this;
+    }
+
+    public List<String> getInputsAll() {
+        return inputsAll;
+    }
+
+    public GroovyMapData setInputsAll(List<String> inputsAll) {
+        this.inputsAll = inputsAll;
+        return this;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public GroovyMapData setUserName(String userName) {
+        this.userName = userName;
+        return this;
+    }
+
+    public String getShellName() {
+        return shellName;
+    }
+
+    public GroovyMapData setShellName(String shellName) {
+        this.shellName = shellName;
+        return this;
+    }
+
+    public String getMaxWallTime() {
+        return maxWallTime;
+    }
+
+    public GroovyMapData setMaxWallTime(String maxWallTime) {
+        this.maxWallTime = maxWallTime;
+        return this;
+    }
+
+    public String getQualityOfService() {
+        return qualityOfService;
+    }
+
+    public GroovyMapData setQualityOfService(String qualityOfService) {
+        this.qualityOfService = qualityOfService;
+        return this;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public GroovyMapData setQueueName(String queueName) {
+        this.queueName = queueName;
+        return this;
+    }
+
+    public Integer getNodes() {
+        return nodes;
+    }
+
+    public GroovyMapData setNodes(Integer nodes) {
+        this.nodes = nodes;
+        return this;
+    }
+
+    public Integer getProcessPerNode() {
+        return processPerNode;
+    }
+
+    public GroovyMapData setProcessPerNode(Integer processPerNode) {
+        this.processPerNode = processPerNode;
+        return this;
+    }
+
+    public Integer getCpuCount() {
+        return cpuCount;
+    }
+
+    public GroovyMapData setCpuCount(Integer cpuCount) {
+        this.cpuCount = cpuCount;
+        return this;
+    }
+
+    public Integer getUsedMem() {
+        return usedMem;
+    }
+
+    public GroovyMapData setUsedMem(Integer usedMem) {
+        this.usedMem = usedMem;
+        return this;
+    }
+
+    public String getMailAddress() {
+        return mailAddress;
+    }
+
+    public GroovyMapData setMailAddress(String mailAddress) {
+        this.mailAddress = mailAddress;
+        return this;
+    }
+
+    public List<String> getExports() {
+        return exports;
+    }
+
+    public GroovyMapData setExports(List<String> exports) {
+        this.exports = exports;
+        return this;
+    }
+
+    public List<String> getModuleCommands() {
+        return moduleCommands;
+    }
+
+    public GroovyMapData setModuleCommands(List<String> moduleCommands) {
+        this.moduleCommands = moduleCommands;
+        return this;
+    }
+
+    public List<String> getPreJobCommands() {
+        return preJobCommands;
+    }
+
+    public GroovyMapData setPreJobCommands(List<String> preJobCommands) {
+        this.preJobCommands = preJobCommands;
+        return this;
+    }
+
+    public List<String> getPostJobCommands() {
+        return postJobCommands;
+    }
+
+    public GroovyMapData setPostJobCommands(List<String> postJobCommands) {
+        this.postJobCommands = postJobCommands;
+        return this;
+    }
+
+    public String getJobSubmitterCommand() {
+        return jobSubmitterCommand;
+    }
+
+    public GroovyMapData setJobSubmitterCommand(String jobSubmitterCommand) {
+        this.jobSubmitterCommand = jobSubmitterCommand;
+        return this;
+    }
+
+    public String getChassisName() {
+        return chassisName;
+    }
+
+    public GroovyMapData setChassisName(String chassisName) {
+        this.chassisName = chassisName;
+        return this;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java
new file mode 100644
index 0000000..208e9e5
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/Script.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+public enum Script {
+
+    SHELL_NAME("shellName"),
+    QUEUE_NAME("queueName"),
+    NODES("nodes"),
+    CPU_COUNT("cpuCount"),
+    MAIL_ADDRESS("mailAddress"),
+    ACCOUNT_STRING("accountString"),
+    MAX_WALL_TIME("maxWallTime"),
+    JOB_NAME("jobName"),
+    STANDARD_OUT_FILE("standardOutFile"),
+    STANDARD_ERROR_FILE("standardErrorFile"),
+    QUALITY_OF_SERVICE("qualityOfService"),
+    RESERVATION("reservation"),
+    EXPORTS("exports"),
+    MODULE_COMMANDS("moduleCommands"),
+    SCRATCH_LOCATION("scratchLocation"),
+    WORKING_DIR("workingDirectory"),
+    PRE_JOB_COMMANDS("preJobCommands"),
+    JOB_SUBMITTER_COMMAND("jobSubmitterCommand"),
+    EXECUTABLE_PATH("executablePath"),
+    INPUTS("inputs"),
+    INPUTS_ALL("inputsAll"),
+    POST_JOB_COMMANDS("postJobCommands"),
+    USED_MEM("usedMem"),
+    PROCESS_PER_NODE("processPerNode"),
+    CHASSIS_NAME("chassisName"),
+    INPUT_DIR("inputDir"),
+    OUTPUT_DIR("outputDir"),
+    USER_NAME("userName"),
+    GATEWAY_ID("gatewayId"),
+    GATEWAY_USER_NAME("gatewayUserName"),
+    APPLICATION_NAME("applicationName"),
+    QUEUE_SPECIFIC_MACROS("queueSpecificMacros")
+    ;
+
+    String name;
+    Script(String name) {
+        this.name = name;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java
new file mode 100644
index 0000000..c03c11f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ScriptTag.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface ScriptTag {
+    public String name();
+    public boolean mandatory() default false;
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java
new file mode 100644
index 0000000..e2cbfee
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/SubmissionUtil.java
@@ -0,0 +1,10 @@
+package org.apache.airavata.helix.impl.task.submission;
+
+import java.io.File;
+
+public class SubmissionUtil {
+
+    public static File createJobFile(GroovyMapData mapData) {
+        return null;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
new file mode 100644
index 0000000..b04ffd8
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
@@ -0,0 +1,102 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+import org.apache.airavata.helix.impl.task.submission.config.imp.*;
+import org.apache.airavata.helix.impl.task.submission.config.imp.parser.*;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.RegistryException;
+
+public class JobFactory {
+
+    public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) {
+        switch (resourceJobManagerType) {
+            case FORK:
+                return "FORK_Groovy.template";
+            case PBS:
+                return "PBS_Groovy.template";
+            case SLURM:
+                return "SLURM_Groovy.template";
+            case UGE:
+                return "UGE_Groovy.template";
+            case LSF:
+                return "LSF_Groovy.template";
+            case CLOUD:
+                return "CLOUD_Groovy.template";
+            default:
+                return null;
+        }
+    }
+
+    public static ResourceJobManager getResourceJobManager(AppCatalog appCatalog, JobSubmissionProtocol submissionProtocol, JobSubmissionInterface jobSubmissionInterface) {
+        try {
+            if (submissionProtocol == JobSubmissionProtocol.SSH ) {
+                SSHJobSubmission sshJobSubmission = getSSHJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getResourceJobManager();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+                LOCALSubmission localJobSubmission = getLocalJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (localJobSubmission != null) {
+                    return localJobSubmission.getResourceJobManager();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){
+                SSHJobSubmission sshJobSubmission = getSSHJobSubmission(appCatalog, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getResourceJobManager();
+                }
+            }
+        } catch (AppCatalogException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    public static LOCALSubmission getLocalJobSubmission(AppCatalog appCatalog, String submissionId) throws AppCatalogException {
+        try {
+            return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+            throw new AppCatalogException(errorMsg, e);
+        }
+    }
+
+    public static SSHJobSubmission getSSHJobSubmission(AppCatalog appCatalog, String submissionId) throws AppCatalogException {
+        try {
+            return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            throw new AppCatalogException(errorMsg, e);
+        }
+    }
+
+    public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws Exception {
+        if(resourceJobManager == null)
+            return null;
+
+
+        String templateFileName = getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+        switch (resourceJobManager.getResourceJobManagerType()) {
+            case PBS:
+                return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), new PBSOutputParser());
+            case SLURM:
+                return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
+                        .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), new SlurmOutputParser());
+            case LSF:
+                return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), new LSFOutputParser());
+            case UGE:
+                return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), new UGEOutputParser());
+            case FORK:
+                return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), new ForkOutputParser());
+            // We don't have a job configuration manager for CLOUD type
+            default:
+                return null;
+        }
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java
new file mode 100644
index 0000000..1fafb00
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobManagerConfiguration.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+public interface JobManagerConfiguration {
+
+    public RawCommandInfo getCancelCommand(String jobID);
+
+    public String getJobDescriptionTemplateName();
+
+    public RawCommandInfo getMonitorCommand(String jobID);
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+    public RawCommandInfo getJobIdMonitorCommand(String jobName , String userName);
+
+    public String getScriptExtension();
+
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+    public OutputParser getParser();
+
+    public String getInstalledPath();
+
+    public String getBaseCancelCommand();
+
+    public String getBaseMonitorCommand();
+
+    public String getBaseSubmitCommand();
+
+}
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java
new file mode 100644
index 0000000..41e8892
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/OutputParser.java
@@ -0,0 +1,41 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.Map;
+
+public interface OutputParser {
+
+    /**
+     * This can be used to parseSingleJob the result of a job submission to get the JobID
+     * @param rawOutput
+     * @return the job id as a String, or null if no job id found
+     */
+    public String parseJobSubmission(String rawOutput) throws Exception;
+
+
+    /**
+     * Parse output return by job submission task and identify jobSubmission failures.
+     * @param rawOutput
+     * @return true if job submission has been failed, false otherwise.
+     */
+    public boolean isJobSubmissionFailed(String rawOutput);
+
+
+    /**
+     * This can be used to get the job status from the output
+     * @param jobID
+     * @param rawOutput
+     */
+    public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception;
+
+    /**
+     * This can be used to parseSingleJob a big output and get multipleJob statuses
+     * @param statusMap list of status map will return and key will be the job ID
+     * @param rawOutput
+     */
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception;
+
+
+    public String parseJobId(String jobName, String rawOutput) throws Exception;
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java
new file mode 100644
index 0000000..d7f9fb3
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/RawCommandInfo.java
@@ -0,0 +1,22 @@
+package org.apache.airavata.helix.impl.task.submission.config;
+
+public class RawCommandInfo {
+
+    private String rawCommand;
+
+    public RawCommandInfo(String cmd) {
+        this.rawCommand = cmd;
+    }
+
+    public String getCommand() {
+        return this.rawCommand;
+    }
+
+    public String getRawCommand() {
+        return rawCommand;
+    }
+
+    public void setRawCommand(String rawCommand) {
+        this.rawCommand = rawCommand;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java
new file mode 100644
index 0000000..d25f17f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/ForkJobConfiguration.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class ForkJobConfiguration implements JobManagerConfiguration {
+    private final Map<JobManagerCommand, String> jobManagerCommands;
+    private String jobDescriptionTemplateName;
+    private String scriptExtension;
+    private String installedPath;
+    private OutputParser parser;
+
+    public ForkJobConfiguration (String jobDescriptionTemplateName, String scriptExtension, String installedPath,
+                                 Map<JobManagerCommand, String> jobManagerCommands, OutputParser parser){
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+        this.scriptExtension = scriptExtension;
+        this.parser = parser;
+	    installedPath = installedPath.trim();
+        if (installedPath.endsWith("/")) {
+            this.installedPath = installedPath;
+        } else {
+            this.installedPath = installedPath + "/";
+        }
+        this.jobManagerCommands = jobManagerCommands;
+    }
+
+    @Override
+    public RawCommandInfo getCancelCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.DELETION).trim() + " " +
+                jobID);
+    }
+
+    @Override
+    public String getJobDescriptionTemplateName() {
+        return jobDescriptionTemplateName;
+    }
+
+    @Override
+    public RawCommandInfo getMonitorCommand(String jobID) {
+        return null;
+    }
+
+    @Override
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return null;
+    }
+
+    @Override
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+        return null;
+    }
+
+    @Override
+    public String getScriptExtension() {
+        return scriptExtension;
+    }
+
+    @Override
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String forkFilePath) {
+        return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+                workingDirectory + File.separator + FilenameUtils.getName(forkFilePath));
+    }
+
+    @Override
+    public OutputParser getParser() {
+        return parser;
+    }
+
+    @Override
+    public String getInstalledPath() {
+        return installedPath;
+    }
+
+    @Override
+    public String getBaseCancelCommand() {
+        return null;
+    }
+
+    @Override
+    public String getBaseMonitorCommand() {
+        return null;
+    }
+
+    @Override
+    public String getBaseSubmitCommand() {
+        return null;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java
new file mode 100644
index 0000000..36bce60
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/JobUtil.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.model.status.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobUtil {
+	private static final Logger log = LoggerFactory.getLogger(JobUtil.class);
+
+	public static JobState getJobState(String status) {
+		log.info("parsing the job status returned : " + status);
+		if (status != null) {
+			if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
+				return JobState.COMPLETE;
+//			} else if ("H".equals(status) || "h".equals(status)) {
+//				return JobState.HELD;
+			} else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
+				return JobState.QUEUED;
+			} else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
+				return JobState.ACTIVE;
+//			} else if ("T".equals(status)) {
+//				return JobState.HELD;
+			} else if ("W".equals(status) || "PD".equals(status)) {
+				return JobState.QUEUED;
+			} else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
+				return JobState.SUSPENDED;
+			} else if ("CA".equals(status)) {
+				return JobState.CANCELED;
+			} else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
+				return JobState.FAILED;
+			} else if ("PR".equals(status) || "Er".equals(status)) {
+				return JobState.FAILED;
+			} else if ("U".equals(status) || ("UNKWN".equals(status))) {
+				return JobState.UNKNOWN;
+			}
+		}
+		return JobState.UNKNOWN;
+	}
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java
new file mode 100644
index 0000000..bccd7ee
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/LSFJobConfiguration.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+public class LSFJobConfiguration implements JobManagerConfiguration {
+    private final static Logger logger = LoggerFactory.getLogger(LSFJobConfiguration.class);
+	private final Map<JobManagerCommand, String> jobMangerCommands;
+    private String jobDescriptionTemplateName;
+    private String scriptExtension;
+    private String installedPath;
+    private OutputParser parser;
+
+    public LSFJobConfiguration(String jobDescriptionTemplateName,
+                               String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+		                               jobManagerCommands, OutputParser parser) {
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+        this.scriptExtension = scriptExtension;
+        this.parser = parser;
+        if (installedPath.endsWith("/") || installedPath.isEmpty()) {
+            this.installedPath = installedPath;
+        } else {
+            this.installedPath = installedPath + "/";
+        }
+	    this.jobMangerCommands = jobManagerCommands;
+    }
+
+    @Override
+    public RawCommandInfo getCancelCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + "bkill " + jobID);
+    }
+
+    @Override
+    public String getJobDescriptionTemplateName() {
+        return jobDescriptionTemplateName;
+    }
+
+    @Override
+    public RawCommandInfo getMonitorCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + "bjobs " + jobID);
+    }
+
+    @Override
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return new RawCommandInfo(this.installedPath + "bjobs -u " + userName);
+    }
+
+    @Override
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+        return new RawCommandInfo(this.installedPath + "bjobs -J " + jobName);
+    }
+
+    @Override
+    public String getScriptExtension() {
+        return scriptExtension;
+    }
+
+    @Override
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+        return new RawCommandInfo(this.installedPath + "bsub < " +
+                workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+    }
+
+    @Override
+    public OutputParser getParser() {
+        return parser;
+    }
+
+    public void setParser(OutputParser parser) {
+        this.parser = parser;
+    }
+
+    @Override
+    public String getInstalledPath() {
+        return installedPath;
+    }
+
+
+    @Override
+    public String getBaseCancelCommand() {
+        return "bkill";
+    }
+
+    @Override
+    public String getBaseMonitorCommand() {
+        return "bjobs";
+    }
+
+    @Override
+    public String getBaseSubmitCommand() {
+        return "bsub";
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java
new file mode 100644
index 0000000..aeedeb9
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/PBSJobConfiguration.java
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class PBSJobConfiguration implements JobManagerConfiguration {
+
+	private final Map<JobManagerCommand, String> jobManagerCommands;
+	private String jobDescriptionTemplateName;
+	private String scriptExtension;
+	private String installedPath;
+	private OutputParser parser;
+
+	public PBSJobConfiguration(String jobDescriptionTemplateName, String scriptExtension, String installedPath,
+	                           Map<JobManagerCommand, String> jobManagerCommands, OutputParser parser) {
+		this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+		this.scriptExtension = scriptExtension;
+		this.parser = parser;
+		installedPath = installedPath.trim();
+		if (installedPath.endsWith("/")) {
+			this.installedPath = installedPath;
+		} else {
+			this.installedPath = installedPath + "/";
+		}
+		this.jobManagerCommands = jobManagerCommands;
+	}
+
+	public RawCommandInfo getCancelCommand(String jobID) {
+		return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.DELETION).trim() + " " +
+				jobID);
+	}
+
+	public String getJobDescriptionTemplateName() {
+		return jobDescriptionTemplateName;
+	}
+
+	public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+		this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+	}
+
+	public RawCommandInfo getMonitorCommand(String jobID) {
+		return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+				+ " -f " + jobID);
+	}
+
+	public String getScriptExtension() {
+		return scriptExtension;
+	}
+
+	public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+		return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+				workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+	}
+
+	public String getInstalledPath() {
+		return installedPath;
+	}
+
+	public void setInstalledPath(String installedPath) {
+		this.installedPath = installedPath;
+	}
+
+	public OutputParser getParser() {
+		return parser;
+	}
+
+	public void setParser(OutputParser parser) {
+		this.parser = parser;
+	}
+
+	public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+		return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+				+ " -u " + userName);
+	}
+
+	@Override
+	public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+		// For PBS there is no option to get jobDetails by JobName, so we search with userName
+		return new RawCommandInfo(this.installedPath + jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim()
+				+ " -u " + userName + " -f  | grep \"Job_Name = " + jobName + "\" -B1");
+	}
+
+	@Override
+	public String getBaseCancelCommand() {
+		return jobManagerCommands.get(JobManagerCommand.DELETION).trim();
+	}
+
+	@Override
+	public String getBaseMonitorCommand() {
+		return jobManagerCommands.get(JobManagerCommand.JOB_MONITORING).trim();
+	}
+
+	@Override
+	public String getBaseSubmitCommand() {
+		return jobManagerCommands.get(JobManagerCommand.SUBMISSION).trim();
+	}
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java
new file mode 100644
index 0000000..fc431ce
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/SlurmJobConfiguration.java
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class SlurmJobConfiguration implements JobManagerConfiguration {
+	private final Map<JobManagerCommand, String> jMCommands;
+    private String jobDescriptionTemplateName;
+    private String scriptExtension;
+    private String installedPath;
+    private OutputParser parser;
+
+    public SlurmJobConfiguration(String jobDescriptionTemplateName,
+                                 String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+		                                 jobManagerCommands, OutputParser parser) {
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+        this.scriptExtension = scriptExtension;
+        this.parser = parser;
+	    installedPath = installedPath.trim();
+        if (installedPath.endsWith("/")) {
+            this.installedPath = installedPath;
+        } else {
+            this.installedPath = installedPath + "/";
+        }
+	    this.jMCommands = jobManagerCommands;
+    }
+
+    public RawCommandInfo getCancelCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.DELETION).trim() + " " + jobID);
+    }
+
+    public String getJobDescriptionTemplateName() {
+        return jobDescriptionTemplateName;
+    }
+
+    public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+    }
+
+    public RawCommandInfo getMonitorCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -j " + jobID);
+    }
+
+    public String getScriptExtension() {
+        return scriptExtension;
+    }
+
+    public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath) {
+          return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.SUBMISSION).trim() + " " +
+                workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+    }
+
+    public String getInstalledPath() {
+        return installedPath;
+    }
+
+    public void setInstalledPath(String installedPath) {
+        this.installedPath = installedPath;
+    }
+
+    public OutputParser getParser() {
+        return parser;
+    }
+
+    public void setParser(OutputParser parser) {
+        this.parser = parser;
+    }
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -u " + userName);
+    }
+
+    @Override
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+        return new RawCommandInfo(this.installedPath + jMCommands.get(JobManagerCommand.JOB_MONITORING).trim() + " -n " + jobName + " -u " + userName);
+    }
+
+    @Override
+    public String getBaseCancelCommand() {
+	    return jMCommands.get(JobManagerCommand.DELETION).trim();
+    }
+
+    @Override
+    public String getBaseMonitorCommand() {
+        return jMCommands.get(JobManagerCommand.JOB_MONITORING).trim();
+    }
+
+    @Override
+    public String getBaseSubmitCommand() {
+        return jMCommands.get(JobManagerCommand.SUBMISSION).trim();
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java
new file mode 100644
index 0000000..6a12966
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/UGEJobConfiguration.java
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp;
+
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.util.Map;
+
+public class UGEJobConfiguration implements JobManagerConfiguration {
+	private final Map<JobManagerCommand, String> jobManagerCommands;
+    private String jobDescriptionTemplateName;
+    private String scriptExtension;
+    private String installedPath;
+    private OutputParser parser;
+
+    public UGEJobConfiguration(String jobDescriptionTemplateName,
+                               String scriptExtension, String installedPath, Map<JobManagerCommand, String>
+		                               jobManagerCommands, OutputParser parser) {
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+        this.scriptExtension = scriptExtension;
+        this.parser = parser;
+        if (installedPath.endsWith("/")) {
+            this.installedPath = installedPath;
+        } else {
+            this.installedPath = installedPath + "/";
+        }
+	    this.jobManagerCommands = jobManagerCommands;
+    }
+
+    public RawCommandInfo getCancelCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + "qdel " + jobID);
+    }
+
+    public String getJobDescriptionTemplateName() {
+        return jobDescriptionTemplateName;
+    }
+
+    public void setJobDescriptionTemplateName(String jobDescriptionTemplateName) {
+        this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+    }
+
+    public RawCommandInfo getMonitorCommand(String jobID) {
+        return new RawCommandInfo(this.installedPath + "qstat -j " + jobID);
+    }
+
+    public String getScriptExtension() {
+        return scriptExtension;
+    }
+
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+        return new RawCommandInfo(this.installedPath + "qsub " +
+                workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+    }
+
+    public String getInstalledPath() {
+        return installedPath;
+    }
+
+    public void setInstalledPath(String installedPath) {
+        this.installedPath = installedPath;
+    }
+
+    public OutputParser getParser() {
+        return parser;
+    }
+
+    public void setParser(OutputParser parser) {
+        this.parser = parser;
+    }
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+    }
+
+    @Override
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+        // For PBS there is no option to get jobDetails by JobName, so we search with userName
+        return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+    }
+
+    @Override
+    public String  getBaseCancelCommand() {
+        return "qdel";
+    }
+
+    @Override
+    public String  getBaseMonitorCommand() {
+        return "qstat";
+    }
+
+    @Override
+    public String getBaseSubmitCommand() {
+        return "qsub ";
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java
new file mode 100644
index 0000000..c3a5a2e
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/AiravataCustomCommandOutputParser.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class AiravataCustomCommandOutputParser implements OutputParser {
+    private static final Logger log = LoggerFactory.getLogger(AiravataCustomCommandOutputParser.class);
+
+    @Override
+    public String parseJobSubmission(String rawOutput) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java
new file mode 100644
index 0000000..a4f48cc
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/ForkOutputParser.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ForkOutputParser implements OutputParser {
+    private static final Logger log = LoggerFactory.getLogger(ForkOutputParser.class);
+
+    @Override
+    public String parseJobSubmission(String rawOutput) throws Exception {
+	    return AiravataUtils.getId("JOB_ID_");
+    }
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
+    public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        // For fork jobs there is no job ID, hence airavata generates a job ID
+        return AiravataUtils.getId(jobName);
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java
new file mode 100644
index 0000000..0bf812f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/LSFOutputParser.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LSFOutputParser implements OutputParser {
+    private final static Logger logger = LoggerFactory.getLogger(LSFOutputParser.class);
+
+    @Override
+    public String parseJobSubmission(String rawOutput) throws Exception {
+        logger.debug(rawOutput);
+        if (rawOutput.indexOf("<") >= 0) {
+            return rawOutput.substring(rawOutput.indexOf("<")+1,rawOutput.indexOf(">"));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
+    public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+        boolean jobFount = false;
+        logger.debug(rawOutput);
+        //todo this is not used anymore
+        return null;
+    }
+
+    @Override
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+        logger.debug(rawOutput);
+
+        String[]    info = rawOutput.split("\n");
+//        int lastStop = 0;
+        for (String jobID : statusMap.keySet()) {
+            String jobName = jobID.split(",")[1];
+            boolean found = false;
+            for (int i = 0; i < info.length; i++) {
+                if (info[i].contains(jobName.substring(0,8))) {
+                    // now starts processing this line
+                    logger.info(info[i]);
+                    String correctLine = info[i];
+                    String[] columns = correctLine.split(" ");
+                    List<String> columnList = new ArrayList<String>();
+                    for (String s : columns) {
+                        if (!"".equals(s)) {
+                            columnList.add(s);
+                        }
+                    }
+//                    lastStop = i + 1;
+                    try {
+	                    statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(2))));
+                    }catch(IndexOutOfBoundsException e) {
+	                    statusMap.put(jobID, new JobStatus(JobState.valueOf("U")));
+                    }
+                    found = true;
+                    break;
+                }
+            }
+            if(!found)
+                logger.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobID.split(",")[0]);
+        }
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        String regJobId = "jobId";
+        Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+        if (rawOutput != null) {
+            Matcher matcher = pattern.matcher(rawOutput);
+            if (matcher.find()) {
+                return matcher.group(regJobId);
+            } else {
+                logger.error("No match is found for JobName");
+                return null;
+            }
+        } else {
+            logger.error("Error: RawOutput shouldn't be null");
+            return null;
+        }
+    }
+
+    public static void main(String[] args) {
+        String test = "Job <2477982> is submitted to queue <short>.";
+        System.out.println(test.substring(test.indexOf("<")+1, test.indexOf(">")));
+        String test1 = "JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME\n" +
+                "2636607 lg11w   RUN   long       ghpcc06     c11b02      *069656647 Mar  7 00:58\n" +
+                "2636582 lg11w   RUN   long       ghpcc06     c02b01      2134490944 Mar  7 00:48";
+        Map<String, JobStatus> statusMap = new HashMap<String, JobStatus>();
+        statusMap.put("2477983,2134490944", new JobStatus(JobState.UNKNOWN));
+        LSFOutputParser lsfOutputParser = new LSFOutputParser();
+        try {
+            lsfOutputParser.parseJobStatuses("cjh", statusMap, test1);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        System.out.println(statusMap.get("2477983,2134490944"));
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java
new file mode 100644
index 0000000..3be8c8a
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/PBSOutputParser.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.JobUtil;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PBSOutputParser implements OutputParser {
+    private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class);
+
+    public String parseJobSubmission(String rawOutput) {
+        log.debug(rawOutput);
+        String jobId = rawOutput;
+        if (!rawOutput.isEmpty() && rawOutput.contains("\n")){
+            String[] split = rawOutput.split("\n");
+            if (split.length != 0){
+                jobId = split[0];
+            }
+        }
+        return jobId;  //In PBS stdout is going to be directly the jobID
+    }
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    public JobStatus parseJobStatus(String jobID, String rawOutput) {
+        boolean jobFount = false;
+        log.debug(rawOutput);
+        String[] info = rawOutput.split("\n");
+        String[] line = null;
+        int index = 0;
+        for (String anInfo : info) {
+            index++;
+            if (anInfo.contains("Job Id:")) {
+                if (anInfo.contains(jobID)) {
+                    jobFount = true;
+                    break;
+                }
+            }
+        }
+        if (jobFount) {
+            for (int i=index;i<info.length;i++) {
+                String anInfo = info[i];
+                if (anInfo.contains("=")) {
+                    line = anInfo.split("=", 2);
+                    if (line.length != 0) {
+                        if (line[0].contains("job_state")) {
+	                        return new JobStatus(JobUtil.getJobState(line[1].replaceAll(" ", "")));
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+        log.debug(rawOutput);
+        String[]    info = rawOutput.split("\n");
+//        int lastStop = 0;
+        for (String jobID : statusMap.keySet()) {
+            String jobName = jobID.split(",")[1];
+            boolean found = false;
+            for (int i = 0; i < info.length; i++) {
+                if (info[i].contains(jobName.substring(0,8))) {
+                    // now starts processing this line
+                    log.info(info[i]);
+                    String correctLine = info[i];
+                    String[] columns = correctLine.split(" ");
+                    List<String> columnList = new ArrayList<String>();
+                    for (String s : columns) {
+                        if (!"".equals(s)) {
+                            columnList.add(s);
+                        }
+                    }
+//                    lastStop = i + 1;
+                    try {
+	                    statusMap.put(jobID, new JobStatus(JobUtil.getJobState(columnList.get(9))));
+                    }catch(IndexOutOfBoundsException e) {
+	                    statusMap.put(jobID, new JobStatus(JobUtil.getJobState("U")));
+                    }
+                    found = true;
+                    break;
+                }
+            }
+            if(!found)
+            log.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobID.split(",")[0]);
+        }
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        /* output will look like
+        Job Id: 2080802.gordon-fe2.local
+            Job_Name = A312402627
+        */
+        String regJobId = "jobId";
+        Pattern pattern = Pattern.compile("(?<" + regJobId + ">[^\\s]*)\\s*.* " + jobName);
+        if (rawOutput != null) {
+            Matcher matcher = pattern.matcher(rawOutput);
+            if (matcher.find()) {
+                return matcher.group(regJobId);
+            } else {
+                log.error("No match is found for JobName");
+                return null;
+            }
+        } else {
+            log.error("Error: RawOutput shouldn't be null");
+            return null;
+        }
+    }
+
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java
new file mode 100644
index 0000000..3ebbcfd
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/SlurmOutputParser.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.JobUtil;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SlurmOutputParser implements OutputParser {
+    private static final Logger log = LoggerFactory.getLogger(SlurmOutputParser.class);
+    public static final int JOB_NAME_OUTPUT_LENGTH = 8;
+    public static final String STATUS = "status";
+	public static final String JOBID = "jobId";
+
+
+    /**
+     * This can be used to parseSingleJob the outpu of sbatch and extrac the jobID from the content
+     *
+     * @param rawOutput
+     * @return
+     */
+    public String parseJobSubmission(String rawOutput) throws Exception {
+	    log.info(rawOutput);
+	    Pattern pattern = Pattern.compile("Submitted batch job (?<" + JOBID + ">[^\\s]*)");
+	    Matcher matcher = pattern.matcher(rawOutput);
+	    if (matcher.find()) {
+		    return matcher.group(JOBID);
+	    }
+	    return "";
+    }
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        Pattern pattern = Pattern.compile("FAILED");
+        Matcher matcher = pattern.matcher(rawOutput);
+        return matcher.find();
+    }
+
+    public JobStatus parseJobStatus(String jobID, String rawOutput) throws Exception {
+        log.info(rawOutput);
+        Pattern pattern = Pattern.compile(jobID + "(?=\\s+\\S+\\s+\\S+\\s+\\S+\\s+(?<" + STATUS + ">\\w+))");
+        Matcher matcher = pattern.matcher(rawOutput);
+        if (matcher.find()) {
+	        return new JobStatus(JobUtil.getJobState(matcher.group(STATUS)));
+        }
+        return null;
+    }
+
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws Exception {
+        log.debug(rawOutput);
+        String[] info = rawOutput.split("\n");
+        String lastString = info[info.length - 1];
+        if (lastString.contains("JOBID") || lastString.contains("PARTITION")) {
+            log.info("There are no jobs with this username ... ");
+            return;
+        }
+//        int lastStop = 0;
+        for (String jobID : statusMap.keySet()) {
+            String jobId = jobID.split(",")[0];
+            String jobName = jobID.split(",")[1];
+            boolean found = false;
+            for (int i = 0; i < info.length; i++) {
+                if (info[i].contains(jobName.substring(0, 8))) {
+                    // now starts processing this line
+                    log.info(info[i]);
+                    String correctLine = info[i];
+                    String[] columns = correctLine.split(" ");
+                    List<String> columnList = new ArrayList<String>();
+                    for (String s : columns) {
+                        if (!"".equals(s)) {
+                            columnList.add(s);
+                        }
+                    }
+                    try {
+	                    statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(4))));
+                    } catch (IndexOutOfBoundsException e) {
+	                    statusMap.put(jobID, new JobStatus(JobState.valueOf("U")));
+                    }
+                    found = true;
+                    break;
+                }
+            }
+            if (!found) {
+                log.error("Couldn't find the status of the Job with JobName: " + jobName + "Job Id: " + jobId);
+            }
+        }
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        String regJobId = "jobId";
+        if (jobName == null) {
+            return null;
+        } else if(jobName.length() > JOB_NAME_OUTPUT_LENGTH) {
+            jobName = jobName.substring(0, JOB_NAME_OUTPUT_LENGTH);
+        }
+        Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+        if (rawOutput != null) {
+            Matcher matcher = pattern.matcher(rawOutput);
+            if (matcher.find()) {
+                return matcher.group(regJobId);
+            } else {
+                log.error("No match is found for JobName");
+                return null;
+            }
+        } else {
+            log.error("Error: RawOutput shouldn't be null");
+            return null;
+        }
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java
new file mode 100644
index 0000000..0f457ff
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/imp/parser/UGEOutputParser.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task.submission.config.imp.parser;
+
+import org.apache.airavata.helix.impl.task.submission.config.OutputParser;
+import org.apache.airavata.helix.impl.task.submission.config.imp.parser.PBSOutputParser;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UGEOutputParser implements OutputParser {
+    private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class);
+    public static final String JOB_ID = "jobId";
+
+	public String parseJobSubmission(String rawOutput) {
+		log.debug(rawOutput);
+		if (rawOutput != null && !rawOutput.isEmpty() && !isJobSubmissionFailed(rawOutput)) {
+			String[] info = rawOutput.split("\n");
+			String lastLine = info[info.length - 1];
+			return lastLine.split(" ")[2]; // In PBS stdout is going to be directly the jobID
+		} else {
+			return "";
+		}
+	}
+
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        Pattern pattern = Pattern.compile("Rejecting");
+        Matcher matcher = pattern.matcher(rawOutput);
+        return matcher.find();
+    }
+
+    public JobStatus parseJobStatus(String jobID, String rawOutput) {
+        Pattern pattern = Pattern.compile("job_number:[\\s]+" + jobID);
+        Matcher matcher = pattern.matcher(rawOutput);
+        if (matcher.find()) {
+	        return new JobStatus(JobState.QUEUED); // fixme; return correct status.
+        }
+	    return new JobStatus(JobState.UNKNOWN);
+    }
+
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+        log.debug(rawOutput);
+        String[] info = rawOutput.split("\n");
+        int lastStop = 0;
+        for (String jobID : statusMap.keySet()) {
+            for(int i=lastStop;i<info.length;i++){
+               if(jobID.split(",")[0].contains(info[i].split(" ")[0]) && !"".equals(info[i].split(" ")[0])){
+                   // now starts processing this line
+                   log.info(info[i]);
+                   String correctLine = info[i];
+                   String[] columns = correctLine.split(" ");
+                   List<String> columnList = new ArrayList<String>();
+                   for (String s : columns) {
+                       if (!"".equals(s)) {
+                           columnList.add(s);
+                       }
+                   }
+                   lastStop = i+1;
+                   if ("E".equals(columnList.get(4))) {
+                       // There is another status with the same letter E other than error status
+                       // to avoid that we make a small tweek to the job status
+                       columnList.set(4, "Er");
+                   }
+	               statusMap.put(jobID, new JobStatus(JobState.valueOf(columnList.get(4))));
+	               break;
+               }
+            }
+        }
+    }
+
+    @Override
+    public String parseJobId(String jobName, String rawOutput) throws Exception {
+        if (jobName.length() > 10) {
+            jobName = jobName.substring(0, 10);
+        }
+        Pattern pattern = Pattern.compile("(?<" + JOB_ID + ">\\S+)\\s+\\S+\\s+(" + jobName + ")");
+        Matcher matcher = pattern.matcher(rawOutput);
+        if (matcher.find()) {
+            return matcher.group(JOB_ID);
+        }
+        return null;
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..fb9917f
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -0,0 +1,232 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@TaskDef(name = "Default Job Submission")
+public class DefaultJobSubmissionTask extends JobSubmissionTask {
+
+    private static final Logger logger = LogManager.getLogger(DefaultJobSubmissionTask.class);
+
+    public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+        try {
+            GroovyMapData groovyMapData = new GroovyMapData();
+
+
+            JobModel jobModel = new JobModel();
+            jobModel.setProcessId(getProcessId());
+            jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            jobModel.setTaskId(getTaskId());
+            jobModel.setJobName(groovyMapData.getJobName());
+
+            File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+
+            if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+                        getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+                JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+                jobModel.setExitCode(submissionOutput.getExitCode());
+                jobModel.setStdErr(submissionOutput.getStdErr());
+                jobModel.setStdOut(submissionOutput.getStdOut());
+
+                String jobId = submissionOutput.getJobId();
+
+                if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) {
+                    jobModel.setJobId(DEFAULT_JOB_ID);
+                    if (submissionOutput.isJobSubmissionFailed()) {
+                        List<JobStatus> statusList = new ArrayList<>();
+                        statusList.add(new JobStatus(JobState.FAILED));
+                        statusList.get(0).setReason(submissionOutput.getFailureReason());
+                        jobModel.setJobStatuses(statusList);
+                        saveJobModel(jobModel);
+                        logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
+                                getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName());
+
+                        ErrorModel errorModel = new ErrorModel();
+                        errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason());
+                        errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
+                        saveExperimentError(errorModel);
+                        saveProcessError(errorModel);
+                        saveTaskError(errorModel);
+                        //taskStatus.setState(TaskState.FAILED);
+                        //taskStatus.setReason("Job submission command didn't return a jobId");
+                        //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                        //taskContext.setTaskStatus(taskStatus);
+                        return onFail("Job submission command didn't return a jobId", false, null);
+
+                    } else {
+                        String msg;
+                        saveJobModel(jobModel);
+                        ErrorModel errorModel = new ErrorModel();
+                        if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
+                            msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+                                    " return non zero exit code:" + submissionOutput.getExitCode() + "  for JobName:" + jobModel.getJobName() +
+                                    ", with failure reason : " + submissionOutput.getFailureReason()
+                                    + " Hence changing job state to Failed." ;
+                            errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
+                        } else {
+                            msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() +
+                                    " doesn't  return valid job submission exit code for JobName:" + jobModel.getJobName() +
+                                    ", with failure reason : stdout ->" + submissionOutput.getStdOut() +
+                                    " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ;
+                            errorModel.setActualErrorMessage(msg);
+                        }
+                        logger.error(msg);
+                        errorModel.setUserFriendlyMessage(msg);
+                        saveExperimentError(errorModel);
+                        saveProcessError(errorModel);
+                        saveTaskError(errorModel);
+                        //taskStatus.setState(TaskState.FAILED);
+                        //taskStatus.setReason(msg);
+                        //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                        //taskContext.setTaskStatus(taskStatus);
+                        return onFail(msg, false, null);
+                    }
+
+                    //TODO save task status??
+                } else if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    saveJobModel(jobModel);
+                    JobStatus jobStatus = new JobStatus();
+                    jobStatus.setJobState(JobState.SUBMITTED);
+                    jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                    saveJobStatus(jobModel);
+
+                    if (verifyJobSubmissionByJobId(adaptor, jobId)) {
+                        jobStatus.setJobState(JobState.QUEUED);
+                        jobStatus.setReason("Verification step succeeded");
+                        jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                        jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                        saveJobStatus(jobModel);
+                    }
+
+                    if (getComputeResourceDescription().isGatewayUsageReporting()){
+                        String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
+                        String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
+                        ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId());
+                        String username = experiment.getUserName() + "@" + getGatewayComputeResourcePreference().getUsageReportingGatewayId();
+                        RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " +  username  +
+                                " -submit_time \"`date '+%F %T %:z'`\"  -jobid " + jobId );
+                        adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
+                    }
+                    //taskStatus = new TaskStatus(TaskState.COMPLETED);
+                    //taskStatus.setReason("Submitted job to compute resource");
+                    //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
+                    return onSuccess("Submitted job to compute resource");
+                } else {
+                    int verificationTryCount = 0;
+                    while (verificationTryCount++ < 3) {
+                        String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getComputeResourceLoginUserName());
+                        if (verifyJobId != null && !verifyJobId.isEmpty()) {
+                            // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+                            jobId = verifyJobId;
+                            jobModel.setJobId(jobId);
+                            saveJobModel(jobModel);
+                            JobStatus jobStatus = new JobStatus();
+                            jobStatus.setJobState(JobState.QUEUED);
+                            jobStatus.setReason("Verification step succeeded");
+                            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                            jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                            saveJobStatus(jobModel);
+                            //taskStatus.setState(TaskState.COMPLETED);
+                            //taskStatus.setReason("Submitted job to compute resource");
+                            //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                            break;
+                        }
+                        logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs");
+                        Thread.sleep(verificationTryCount * 10000);
+                    }
+                }
+
+                if (jobId == null || jobId.isEmpty()) {
+                    jobModel.setJobId(DEFAULT_JOB_ID);
+                    saveJobModel(jobModel);
+                    String msg = "expId:" + getExperimentId() + " Couldn't find " +
+                            "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+                            "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+                    logger.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setUserFriendlyMessage(msg);
+                    errorModel.setActualErrorMessage(msg);
+                    saveExperimentError(errorModel);
+                    saveProcessError(errorModel);
+                    saveTaskError(errorModel);
+                    //taskStatus.setState(TaskState.FAILED);
+                    //taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+                    //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    return onFail("Couldn't find job id in both submitted and verified steps", false, null);
+                }else {
+                    //GFacUtils.saveJobModel(processContext, jobModel);
+                }
+
+            }  else {
+                //taskStatus.setState(TaskState.FAILED);
+                if (jobFile == null) {
+                    return onFail("Job file is null", true, null);
+                  //  taskStatus.setReason("JobFile is null");
+                } else {
+                    //taskStatus.setReason("Job file doesn't exist");
+                    return onFail("Job file doesn't exist", true, null);
+                }
+            }
+        } catch (Exception e) {
+            return onFail("Task failed due to unexpected issue", false, null);
+        }
+        // TODO get rid of this
+        return onFail("Task moved to an unknown state", false, null);
+    }
+
+    private boolean verifyJobSubmissionByJobId(AgentAdaptor agentAdaptor, String jobID) throws Exception {
+        JobStatus status = getJobStatus(agentAdaptor, jobID);
+        return status != null &&  status.getJobState() != JobState.UNKNOWN;
+    }
+
+    private String verifyJobSubmission(AgentAdaptor agentAdaptor, String jobName, String userName) {
+        String jobId = null;
+        try {
+            jobId  = getJobIdByJobName(agentAdaptor, jobName, userName);
+        } catch (Exception e) {
+            logger.error("Error while verifying JobId from JobName " + jobName);
+        }
+        return jobId;
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..da04365
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -0,0 +1,79 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.util.Arrays;
+
+@TaskDef(name = "Fork Job Submission")
+public class ForkJobSubmissionTask extends JobSubmissionTask {
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+
+        try {
+            GroovyMapData groovyMapData = new GroovyMapData();
+
+            JobModel jobModel = new JobModel();
+            jobModel.setProcessId(getProcessId());
+            jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            jobModel.setTaskId(getTaskId());
+            jobModel.setJobName(groovyMapData.getJobName());
+
+            File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+            if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+                        getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+                JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+                jobModel.setExitCode(submissionOutput.getExitCode());
+                jobModel.setStdErr(submissionOutput.getStdErr());
+                jobModel.setStdOut(submissionOutput.getStdOut());
+
+                String jobId = submissionOutput.getJobId();
+
+                if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    saveJobModel(jobModel);
+                    JobStatus jobStatus = new JobStatus();
+                    jobStatus.setJobState(JobState.SUBMITTED);
+                    jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                    saveJobStatus(jobModel);
+
+                    return null;
+                } else {
+                    String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" +
+                            jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " +
+                            "Hence changing experiment state to Failed";
+                }
+
+            }
+            return null;
+
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
new file mode 100644
index 0000000..fe5a3dc
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -0,0 +1,202 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.helix.HelixManager;
+
+import java.io.File;
+import java.util.*;
+
+public abstract class JobSubmissionTask extends AiravataTask {
+
+
+
+    @Override
+    public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
+        super.init(manager, workflowName, jobName, taskName);
+    }
+
+    //////////////////////
+    protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, File jobFile, String workingDirectory) throws Exception {
+        JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+                getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+        RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobFile.getPath());
+        CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
+
+        JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+
+        jsoutput.setJobId(jobManagerConfiguration.getParser().parseJobSubmission(commandOutput.getStdOut()));
+        if (jsoutput.getJobId() == null) {
+            if (jobManagerConfiguration.getParser().isJobSubmissionFailed(commandOutput.getStdOut())) {
+                jsoutput.setJobSubmissionFailed(true);
+                jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
+                        "\n stderr : " + commandOutput.getStdError());
+            }
+        }
+        jsoutput.setExitCode(commandOutput.getExitCode());
+        if (jsoutput.getExitCode() != 0) {
+            jsoutput.setJobSubmissionFailed(true);
+            jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
+                    "\n stderr : " + commandOutput.getStdError());
+        }
+        jsoutput.setStdOut(commandOutput.getStdOut());
+        jsoutput.setStdErr(commandOutput.getStdError());
+        return jsoutput;
+
+    }
+
+    public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception {
+        JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+                getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+        CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobID).getRawCommand(), null);
+
+        return jobManagerConfiguration.getParser().parseJobStatus(jobID, commandOutput.getStdOut());
+
+    }
+
+    public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception {
+        JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+                getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface()));
+
+        RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+        CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null);
+        return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
+    }
+
+    ////////////////////////////////
+
+
+    /////////////////////////////////////////////
+    public void saveExperimentError(ErrorModel errorModel) throws Exception {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
+            getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, getExperimentId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+                    + " : - Error while updating experiment errors";
+            throw new Exception(msg, e);
+        }
+    }
+
+    public void saveProcessError(ErrorModel errorModel) throws Exception {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
+            getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId()
+                    + " : - Error while updating process errors";
+            throw new Exception(msg, e);
+        }
+    }
+
+    public void saveTaskError(ErrorModel errorModel) throws Exception {
+        try {
+            errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
+            getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId()
+                    + " : - Error while updating task errors";
+            throw new Exception(msg, e);
+        }
+    }
+
+    public void saveJobModel(JobModel jobModel) throws RegistryException {
+        getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId());
+    }
+
+    public void saveJobStatus(JobModel jobModel) throws Exception {
+        try {
+            // first we save job jobModel to the registry for sa and then save the job status.
+            JobStatus jobStatus = null;
+            if(jobModel.getJobStatuses() != null)
+                jobStatus = jobModel.getJobStatuses().get(0);
+
+            List<JobStatus> statuses = new ArrayList<>();
+            statuses.add(jobStatus);
+            jobModel.setJobStatuses(statuses);
+
+            if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) {
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            } else {
+                jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
+            }
+
+            CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
+            getExperimentCatalog().add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
+                    getProcessId(), getProcessModel().getExperimentId(), getGatewayId());
+
+            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
+            MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
+                    (MessageType.JOB.name()), getGatewayId());
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception e) {
+            throw new Exception("Error persisting job status" + e.getLocalizedMessage(), e);
+        }
+    }
+
+    ///////////// required for groovy map
+
+    private String workingDir;
+    private String scratchLocation;
+    private UserComputeResourcePreference userComputeResourcePreference;
+
+    public String getWorkingDir() {
+        if (workingDir == null) {
+            if (getProcessModel().getProcessResourceSchedule().getStaticWorkingDir() != null){
+                workingDir = getProcessModel().getProcessResourceSchedule().getStaticWorkingDir();
+            }else {
+                String scratchLocation = getScratchLocation();
+                workingDir = (scratchLocation.endsWith("/") ? scratchLocation + getProcessId() : scratchLocation + "/" +
+                        getProcessId());
+            }
+        }
+        return workingDir;
+    }
+
+    public String getScratchLocation() {
+        if (scratchLocation == null) {
+            if (isUseUserCRPref() &&
+                    userComputeResourcePreference != null &&
+                    isValid(userComputeResourcePreference.getScratchLocation())) {
+                scratchLocation = userComputeResourcePreference.getScratchLocation();
+            } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+                scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+            }else {
+                scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+            }
+        }
+        return scratchLocation;
+    }
+
+    protected UserComputeResourcePreference userComputeResourcePreference() throws AppCatalogException {
+        UserComputeResourcePreference userComputeResourcePreference =
+                getAppCatalog().getUserResourceProfile().getUserComputeResourcePreference(
+                        getProcessModel().getUserName(),
+                        getGatewayId(),
+                        getProcessModel().getComputeResourceId());
+    }
+
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
new file mode 100644
index 0000000..5a3ca31
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -0,0 +1,81 @@
+package org.apache.airavata.helix.impl.task.submission.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.impl.task.submission.GroovyMapData;
+import org.apache.airavata.helix.impl.task.submission.SubmissionUtil;
+import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.UUID;
+
+@TaskDef(name = "Local Job Submission")
+public class LocalJobSubmissionTask extends JobSubmissionTask {
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+
+        try {
+            GroovyMapData groovyMapData = new GroovyMapData();
+            String jobId = "JOB_ID_" + UUID.randomUUID().toString();
+
+            JobModel jobModel = new JobModel();
+            jobModel.setProcessId(getProcessId());
+            jobModel.setWorkingDir(groovyMapData.getWorkingDirectory());
+            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            jobModel.setTaskId(getTaskId());
+            jobModel.setJobId(jobId);
+
+            File jobFile = SubmissionUtil.createJobFile(groovyMapData);
+
+            if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                saveJobModel(jobModel);
+
+                AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(),
+                        getJobSubmissionProtocol().name(), getComputeResourceCredentialToken());
+
+                JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory());
+
+                JobStatus jobStatus = new JobStatus();
+                jobStatus.setJobState(JobState.SUBMITTED);
+                jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                jobModel.setJobStatuses(Arrays.asList(jobStatus));
+
+                saveJobStatus(jobModel);
+
+                jobModel.setExitCode(submissionOutput.getExitCode());
+                jobModel.setStdErr(submissionOutput.getStdErr());
+                jobModel.setStdOut(submissionOutput.getStdOut());
+
+                jobStatus.setJobState(JobState.COMPLETE);
+                jobStatus.setReason("Successfully Completed " + getComputeResourceDescription().getHostName());
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                jobModel.setJobStatuses(Arrays.asList(jobStatus));
+
+                saveJobStatus(jobModel);
+
+                return null;
+            }
+
+            return null;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
new file mode 100644
index 0000000..51feff4
--- /dev/null
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -0,0 +1,31 @@
+package org.apache.airavata.helix.impl.workflow;
+
+import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.impl.task.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
+import org.apache.airavata.helix.workflow.WorkflowManager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class SimpleWorkflow {
+
+    public static void main(String[] args) throws Exception {
+
+        EnvSetupTask envSetupTask = new EnvSetupTask();
+        envSetupTask.setWorkingDirectory("/tmp/a");
+
+        DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
+        defaultJobSubmissionTask.setGatewayId("default");
+        defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
+        defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
+        defaultJobSubmissionTask.setTaskId(UUID.randomUUID().toString());
+
+        List<AbstractTask> tasks = new ArrayList<>();
+        tasks.add(defaultJobSubmissionTask);
+
+        WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
+        workflowManager.launchWorkflow(UUID.randomUUID().toString(), tasks, true);
+    }
+}
diff --git a/modules/helix-spectator/src/main/resources/airavata-server.properties b/modules/helix-spectator/src/main/resources/airavata-server.properties
new file mode 100644
index 0000000..5f47d79
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/airavata-server.properties
@@ -0,0 +1,334 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+#
+#  This properties file provides configuration for all Airavata Services:
+#  API Server, Registry, Workflow Interpreter, GFac, Orchestrator
+#
+###########################################################################
+
+###########################################################################
+#  API Server Registry Configuration
+###########################################################################
+
+#for derby [AiravataJPARegistry]
+#registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#registry.jdbc.url=jdbc:derby://localhost:1527/experiment_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+registry.jdbc.driver=org.mariadb.jdbc.Driver
+registry.jdbc.url=jdbc:mariadb://149.165.168.248:3306/experiment_catalog
+registry.jdbc.user=eroma
+registry.jdbc.password=eroma123456
+#FIXME: Probably the following property should be removed.
+start.derby.server.mode=false
+validationQuery=SELECT 1 from CONFIGURATION
+cache.enable=false
+jpa.cache.size=-1
+#jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true
+enable.sharing=true
+
+# Properties for default user mode
+default.registry.user=default-admin
+default.registry.password=123456
+default.registry.password.hash.method=SHA
+default.registry.gateway=default
+super.tenant.gatewayId=default
+
+# Properties for cluster status monitoring
+# cluster status monitoring job repeat time in seconds
+cluster.status.monitoring.enable=false
+cluster.status.monitoring.repeat.time=18000
+
+###########################################################################
+#  Application Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#appcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#appcatalog.jdbc.url=jdbc:derby://localhost:1527/app_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+appcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+appcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/app_catalog
+appcatalog.jdbc.user=eroma
+appcatalog.jdbc.password=eroma123456
+appcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+##########################################################################
+#  Replica Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#replicacatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#replicacatalog.jdbc.url=jdbc:derby://localhost:1527/replica_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+replicacatalog.jdbc.driver=org.mariadb.jdbc.Driver
+replicacatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog
+replicacatalog.jdbc.user=eroma
+replicacatalog.jdbc.password=eroma123456
+replicacatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+#  Workflow Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#workflowcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#workflowcatalog.jdbc.url=jdbc:derby://localhost:1527/workflow_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+workflowcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+workflowcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog
+workflowcatalog.jdbc.user=eroma
+workflowcatalog.jdbc.password=eroma123456
+workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+#  Sharing Catalog DB Configuration
+###########################################################################
+#for derby [AiravataJPARegistry]
+#sharingcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#sharingcatalog.jdbc.url=jdbc:derby://localhost:1527/sharing_catalog;create=true;user=airavata;password=airavata
+# MariaDB database configuration
+sharingcatalog.jdbc.driver=org.mariadb.jdbc.Driver
+sharingcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/sharing_catalog
+sharingcatalog.jdbc.user=eroma
+sharingcatalog.jdbc.password=eroma123456
+sharingcatalog.validationQuery=SELECT 1 from CONFIGURATION
+
+###########################################################################
+#  Sharing Registry Server Configuration
+###########################################################################
+sharing_server=org.apache.airavata.sharing.registry.server.SharingRegistryServer
+sharing.registry.server.host=192.168.99.102
+sharing.registry.server.port=7878
+
+###########################################################################
+#  User Profile MongoDB Configuration
+###########################################################################
+userprofile.mongodb.host=localhost
+userprofile.mongodb.port=27017
+
+
+###########################################################################
+#  Server module Configuration
+###########################################################################
+#credential store server should be started before API server
+#This is obsolete property with new script files.
+#servers=credentialstore,apiserver,orchestrator
+
+
+###########################################################################
+#  API Server Configurations
+###########################################################################
+apiserver=org.apache.airavata.api.server.AiravataAPIServer
+apiserver.name=apiserver-node0
+apiserver.host=192.168.99.102
+apiserver.port=8930
+apiserver.min.threads=50
+
+###########################################################################
+#  Orchestrator Server Configurations
+###########################################################################
+orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
+orchestrator.server.name=orchestrator-node0
+orchestrator.server.host=192.168.99.102
+orchestrator.server.port=8940
+orchestrator.server.min.threads=50
+job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
+submitter.interval=10000
+threadpool.size=10
+start.submitter=true
+embedded.mode=true
+enable.validation=true
+
+###########################################################################
+#  Registry Server Configurations
+###########################################################################
+regserver=org.apache.airavata.registry.api.service.RegistryAPIServer
+regserver.server.name=regserver-node0
+regserver.server.host=192.168.99.102
+regserver.server.port=8970
+regserver.server.min.threads=50
+
+###########################################################################
+#  GFac Server Configurations
+###########################################################################
+gfac=org.apache.airavata.gfac.server.GfacServer
+gfac.server.name=gfac-node0
+gfac.server.host=10.0.2.15
+gfac.server.port=8950
+gfac.thread.pool.size=50
+host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler
+
+
+
+###########################################################################
+# Airavata Workflow Interpreter Configurations
+###########################################################################
+workflowserver=org.apache.airavata.api.server.WorkflowServer
+enactment.thread.pool.size=10
+
+#to define custom workflow parser user following property
+#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder
+
+
+
+###########################################################################
+#  Job Scheduler can send informative email messages to you about the status of your job.
+# Specify a string which consists of either the single character "n" (no mail), or one or more
+#  of the characters "a" (send mail when job is aborted), "b" (send mail when job begins),
+# and "e" (send mail when job terminates).  The default is "a" if not specified.
+###########################################################################
+
+job.notification.enable=true
+#Provide comma separated email ids as a string if more than one
+job.notification.emailids=
+job.notification.flags=abe
+
+###########################################################################
+# Credential Store module Configuration
+###########################################################################
+credential.store.keystore.url=/home/pga/master-deployment/keystores/cred_store.jks
+credential.store.keystore.alias=seckey
+credential.store.keystore.password=123456
+credential.store.jdbc.url=jdbc:mariadb://149.165.168.248:3306/credential_store
+credential.store.jdbc.user=eroma
+credential.store.jdbc.password=eroma123456
+credential.store.jdbc.driver=org.mariadb.jdbc.Driver
+credential.store.server.host=192.168.99.102
+credential.store.server.port=8960
+credentialstore=org.apache.airavata.credential.store.server.CredentialStoreServer
+credential.stroe.jdbc.validationQuery=SELECT 1 from CONFIGURATION
+
+# these properties used by credential store email notifications
+email.server=smtp.googlemail.com
+email.server.port=465
+email.user=airavata
+email.password=xxx
+email.ssl=true
+email.from=airavata@apache.org
+
+# SSH PKI key pair or ssh password can be used SSH based sshKeyAuthentication is used.
+# if user specify both password sshKeyAuthentication gets the higher preference
+
+################# ---------- For ssh key pair sshKeyAuthentication ------------------- ################
+#ssh.public.key=/path to public key for ssh
+#ssh.private.key=/path to private key file for ssh
+#ssh.keypass=passphrase for the private key
+#ssh.username=username for ssh connection
+## If you set "yes" for ssh.strict.hostKey.checking, then you must provide known hosts file path
+#ssh.strict.hostKey.checking=yes/no
+#ssh.known.hosts.file=/path to known hosts file
+### Incase of password sshKeyAuthentication.
+#ssh.password=Password for ssh connection
+
+################ ---------- BES Properties ------------------- ###############
+#bes.ca.cert.path=<location>/certificates/cacert.pem
+#bes.ca.key.path=<location>/certificates/cakey.pem
+#bes.ca.key.pass=passphrase
+
+###########################################################################
+# Monitoring module Configuration
+###########################################################################
+
+#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
+#mechanisms and one would be able to start a monitor
+monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
+
+#These properties will used to enable email base monitoring
+email.based.monitor.host=imap.gmail.com
+email.based.monitor.address=ansibletestairavata@gmail.com
+email.based.monitor.password=ansibletestairavata123
+email.based.monitor.folder.name=INBOX
+# either imaps or pop3
+email.based.monitor.store.protocol=imaps
+#These property will be used to query the email server periodically. value in milliseconds(ms).
+email.based.monitoring.period=10000
+
+###########################################################################
+# AMQP Notification Configuration
+###########################################################################
+#for simple scenarios we can use the guest user
+#rabbitmq.broker.url=amqp://localhost:5672
+#for production scenarios, give url as amqp://userName:password@hostName:portNumber/virtualHost, create user, virtualhost
+# and give permissions, refer: http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html
+rabbitmq.broker.url=amqp://airavata:123456@192.168.99.102:5672/master
+rabbitmq.status.exchange.name=status_exchange
+rabbitmq.process.exchange.name=process_exchange
+rabbitmq.experiment.exchange.name=experiment_exchange
+durable.queue=false
+prefetch.count=200
+process.launch.queue.name=process.launch.queue
+experiment.launch..queue.name=experiment.launch.queue
+
+###########################################################################
+# Zookeeper Server Configuration
+###########################################################################
+embedded.zk=false
+zookeeper.server.connection=192.168.99.102:2181
+zookeeper.timeout=30000
+
+########################################################################
+## API Security Configuration
+########################################################################
+api.secured=false
+security.manager.class=org.apache.airavata.service.security.KeyCloakSecurityManager
+### TLS related configuration ####
+TLS.enabled=true
+TLS.api.server.port=9930
+TLS.client.timeout=10000
+#### keystore configuration ####
+keystore.path=/home/pga/master-deployment/keystores/airavata.jks
+keystore.password=password
+#### trust store configuration ####
+trust.store=/home/pga/master-deployment/keystores/client_truststore.jks
+trust.store.password=password
+#### authorization cache related configuration ####
+authz.cache.enabled=true
+authz.cache.manager.class=org.apache.airavata.service.security.authzcache.DefaultAuthzCacheManager
+in.memory.cache.size=1000
+
+# Kafka Logging related configuration
+isRunningOnAws=false
+kafka.broker.list=localhost:9092
+kafka.topic.prefix=local
+enable.kafka.logging=false
+
+###########################################################################
+# Profile Service Configuration
+###########################################################################
+profile.service.server.host=192.168.99.102
+profile.service.server.port=8962
+profile_service=org.apache.airavata.service.profile.server.ProfileServiceServer
+# MariaDB properties
+profile.service.jdbc.url=jdbc:mariadb://149.165.168.248:3306/profile_service
+profile.service.jdbc.user=eroma
+profile.service.jdbc.password=eroma123456
+profile.service.jdbc.driver=org.mariadb.jdbc.Driver
+profile.service.validationQuery=SELECT 1
+
+###########################################################################
+# Iam Admin services Configuration
+###########################################################################
+iam.server.url=https://192.168.99.102/auth
+iam.server.super.admin.username=admin
+iam.server.super.admin.password=123456
+
+###########################################################################
+# DB Event Manager Runner
+###########################################################################
+db_event_manager=org.apache.airavata.db.event.manager.DBEventManagerRunner
diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties
new file mode 100644
index 0000000..41c5e5f
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+participant.name=all-p1
\ No newline at end of file
diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e910f32
--- /dev/null
+++ b/modules/helix-spectator/src/main/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.category.org.apache.helix=WARN
+log4j.category.org.apache.zookeeper=WARN
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 91202d7..bd99a2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -696,6 +696,8 @@
                 <module>modules/distribution</module>
                 <!--<module>modules/test-suite</module>-->
                 <module>modules/compute-account-provisioning</module>
+                <module>modules/airavata-helix</module>
+                <module>modules/helix-spectator</module>
             </modules>
         </profile>
         <profile>

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.