You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:14:02 UTC

[airavata-sandbox] 18/19: Initial agent implementation

This is an automated email from the ASF dual-hosted git repository.

smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git

commit e42d3f65f1d326b06d106f529f4e4144aab051cb
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Sat Dec 2 23:00:56 2017 +0530

    Initial agent implementation
---
 .../modules/agents/agent-core/pom.xml              |   40 +
 .../airavata/agents/core/AsyncOperation.java       |   24 +
 .../airavata/agents/core/StatusPublisher.java      |   49 +
 .../modules/agents/thrift-agent/pom.xml            |   47 +
 .../agents/thrift/handler/OperationHandler.java    |   34 +
 .../thrift/operation/ThriftAgentOperation.java     |   45 +
 .../agents/thrift/server/OperationServer.java      |   51 +
 .../agents/thrift/stubs/OperationException.java    |  476 +++++++++
 .../agents/thrift/stubs/OperationService.java      | 1072 ++++++++++++++++++++
 .../src/main/resources/application.properties      |    2 +
 .../thrift-agent/src/main/resources/schema.thrift  |   11 +
 .../process/ProcessBootstrapDataResource.java      |   40 +
 .../k8s/api/resources/process/ProcessResource.java |   10 +
 .../apache/airavata/helix/api/AbstractTask.java    |   24 +-
 .../airavata/helix/api/HelixParticipant.java       |    7 +-
 .../api/server/controller/WorkflowController.java  |   10 +-
 .../server/model/process/ProcessBootstrapData.java |   63 ++
 .../k8s/api/server/model/process/ProcessModel.java |   12 +
 .../process/ProcessBootstrapDataRepository.java    |   13 +
 .../k8s/api/server/service/ProcessService.java     |   15 +-
 .../k8s/api/server/service/WorkflowService.java    |   15 +-
 .../api/server/service/util/ToResourceUtil.java    |   17 +
 .../microservices/async-event-listener/pom.xml     |  129 +++
 .../airavata/async/event/listener/Application.java |   29 +
 .../event/listener/messaging/KafkaReceiver.java    |   52 +
 .../event/listener/messaging/ReceiverConfig.java   |   82 ++
 .../event/listener/service/ListenerService.java    |   35 +
 .../src/main/resources/application.properties      |    4 +
 .../src/main/resources/application.yml             |    4 +
 .../microservices/tasks/async-command-task/pom.xml |  118 +++
 .../task/async/command/AsyncCommandTask.java}      |   65 +-
 .../helix/task/async}/command/Participant.java     |    9 +-
 .../src/main/resources/application.properties      |    8 +
 .../src/main/resources/log4j.properties            |    9 +
 .../airavata/helix/task/command/CommandTask.java   |    5 +-
 .../airavata/helix/task/command/Participant.java   |    2 +-
 .../src/main/resources/application.properties      |    2 +
 .../airavata/helix/task/datain/DataInputTask.java  |    5 +-
 .../airavata/helix/task/datain/Participant.java    |    2 +-
 .../src/main/resources/application.properties      |    4 +-
 .../helix/task/dataout/DataOutputTask.java         |    5 +-
 .../airavata/helix/task/dataout/Participant.java   |    2 +-
 .../src/main/resources/application.properties      |    4 +-
 .../k8s/gfac/core/HelixWorkflowManager.java        |   24 +-
 .../airavata/k8s/gfac/service/WorkerService.java   |    3 +-
 airavata-kubernetes/pom.xml                        |    4 +
 46 files changed, 2607 insertions(+), 76 deletions(-)

diff --git a/airavata-kubernetes/modules/agents/agent-core/pom.xml b/airavata-kubernetes/modules/agents/agent-core/pom.xml
new file mode 100644
index 0000000..e8ce15d
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-kubernetes</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>agent-core</artifactId>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>api-resource</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java
new file mode 100644
index 0000000..cc488a0
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java
@@ -0,0 +1,24 @@
+package org.apache.airavata.agents.core;
+
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AsyncOperation {
+
+    private ComputeResource computeResource;
+
+    public AsyncOperation(ComputeResource computeResource) {
+        this.computeResource = computeResource;
+    }
+
+    public abstract void executeCommandAsync(String command, long callbackWorkflowId);
+
+    public ComputeResource getComputeResource() {
+        return computeResource;
+    }
+}
diff --git a/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java
new file mode 100644
index 0000000..71af84e
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java
@@ -0,0 +1,49 @@
+package org.apache.airavata.agents.core;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class StatusPublisher {
+    private String brokerUrl;
+    private String topicName;
+
+    private Producer<String, String> eventProducer;
+
+    public StatusPublisher(String brokerUrl, String topicName) {
+        this.brokerUrl = brokerUrl;
+        this.topicName = topicName;
+        this.initializeKafkaEventProducer();
+    }
+
+    public void publishStatus(long callbackWorkflowId, String status, String message) {
+        this.eventProducer.send(new ProducerRecord<String, String>(
+                this.topicName, String.join(",", callbackWorkflowId + "", status, message)));
+    }
+
+    public void initializeKafkaEventProducer() {
+        Properties props = new Properties();
+
+        props.put("bootstrap.servers", this.brokerUrl);
+
+        props.put("acks", "all");
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 1);
+        props.put("buffer.memory", 33554432);
+        props.put("key.serializer",
+                "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer",
+                "org.apache.kafka.common.serialization.StringSerializer");
+
+        eventProducer = new KafkaProducer<String, String>(props);
+    }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/pom.xml b/airavata-kubernetes/modules/agents/thrift-agent/pom.xml
new file mode 100644
index 0000000..1bfb0b8
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-kubernetes</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>thrift-agent</artifactId>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.10.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-core</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>helix-task-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java
new file mode 100644
index 0000000..c402a83
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.agents.thrift.handler;
+
+import org.apache.airavata.agents.core.StatusPublisher;
+import org.apache.airavata.agents.thrift.stubs.OperationException;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.thrift.TException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OperationHandler extends StatusPublisher implements OperationService.Iface {
+
+    public OperationHandler(String brokerUrl, String topicName) {
+        super(brokerUrl, topicName);
+    }
+
+    @Override
+    public void executeCommand(String command, long callbackWorkflowId) throws OperationException, TException {
+        publishStatus(callbackWorkflowId, "PENDING", "Pending for execution");
+        publishStatus(callbackWorkflowId, "STARTED", "Starting command execution");
+        Runnable task = new Runnable() {
+            @Override
+            public void run() {
+                System.out.println("Executing command " + command);
+                publishStatus(callbackWorkflowId, "SUCCESS", "Command execution succeeded");
+            }
+        };
+
+        new Thread(task).start();
+    }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java
new file mode 100644
index 0000000..cb9010b
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java
@@ -0,0 +1,45 @@
+package org.apache.airavata.agents.thrift.operation;
+
+import org.apache.airavata.agents.core.AsyncOperation;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class ThriftAgentOperation extends AsyncOperation {
+
+    private OperationService.Client client;
+
+    public ThriftAgentOperation(ComputeResource computeResource) {
+        super(computeResource);
+
+        try {
+            TTransport transport = new TSocket(computeResource.getHost(), 9090);
+            transport.open();
+            TProtocol protocol = new TBinaryProtocol(transport);
+            this.client = new OperationService.Client(protocol);
+
+        } catch (TTransportException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void executeCommandAsync(String command, long callbackWorkflowId) {
+        try {
+            client.executeCommand(command, callbackWorkflowId);
+        } catch (TException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java
new file mode 100644
index 0000000..4eecd8f
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java
@@ -0,0 +1,51 @@
+package org.apache.airavata.agents.thrift.server;
+
+import org.apache.airavata.agents.thrift.handler.OperationHandler;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.airavata.helix.api.PropertyResolver;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TSimpleServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.io.IOException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OperationServer {
+
+    private static OperationHandler operationHandler;
+    private static OperationService.Processor processor;
+
+    public static void main(String args[]) throws IOException {
+        PropertyResolver resolver = new PropertyResolver();
+        resolver.loadInputStream(OperationService.class.getClassLoader().getResourceAsStream("application.properties"));
+        operationHandler = new OperationHandler(resolver.get("kafka.bootstrap.url"), resolver.get("async.event.listener.topic"));
+        processor = new OperationService.Processor(operationHandler);
+
+        Runnable server = new Runnable() {
+            @Override
+            public void run() {
+                simple(processor);
+            }
+        };
+
+        new Thread(server).start();
+    }
+
+    public static void simple(OperationService.Processor processor) {
+        try {
+            TServerTransport serverTransport = new TServerSocket(9090);
+            TServer server = new TSimpleServer(new TServer.Args(serverTransport).processor(processor));
+
+            System.out.println("Starting the operation server...");
+            server.serve();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java
new file mode 100644
index 0000000..0ab63f5
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java
@@ -0,0 +1,476 @@
+/**
+ * Autogenerated by Thrift Compiler (0.10.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.agents.thrift.stubs;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)", date = "2017-12-02")
+public class OperationException extends org.apache.thrift.TException implements org.apache.thrift.TBase<OperationException, OperationException._Fields>, java.io.Serializable, Cloneable, Comparable<OperationException> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OperationException");
+
+  private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField STACKTRACE_FIELD_DESC = new org.apache.thrift.protocol.TField("stacktrace", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+  private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new OperationExceptionStandardSchemeFactory();
+  private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new OperationExceptionTupleSchemeFactory();
+
+  public java.lang.String message; // required
+  public java.lang.String stacktrace; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    MESSAGE((short)1, "message"),
+    STACKTRACE((short)2, "stacktrace");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // MESSAGE
+          return MESSAGE;
+        case 2: // STACKTRACE
+          return STACKTRACE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.STACKTRACE, new org.apache.thrift.meta_data.FieldMetaData("stacktrace", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(OperationException.class, metaDataMap);
+  }
+
+  public OperationException() {
+  }
+
+  public OperationException(
+    java.lang.String message,
+    java.lang.String stacktrace)
+  {
+    this();
+    this.message = message;
+    this.stacktrace = stacktrace;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public OperationException(OperationException other) {
+    if (other.isSetMessage()) {
+      this.message = other.message;
+    }
+    if (other.isSetStacktrace()) {
+      this.stacktrace = other.stacktrace;
+    }
+  }
+
+  public OperationException deepCopy() {
+    return new OperationException(this);
+  }
+
+  @Override
+  public void clear() {
+    this.message = null;
+    this.stacktrace = null;
+  }
+
+  public java.lang.String getMessage() {
+    return this.message;
+  }
+
+  public OperationException setMessage(java.lang.String message) {
+    this.message = message;
+    return this;
+  }
+
+  public void unsetMessage() {
+    this.message = null;
+  }
+
+  /** Returns true if field message is set (has been assigned a value) and false otherwise */
+  public boolean isSetMessage() {
+    return this.message != null;
+  }
+
+  public void setMessageIsSet(boolean value) {
+    if (!value) {
+      this.message = null;
+    }
+  }
+
+  public java.lang.String getStacktrace() {
+    return this.stacktrace;
+  }
+
+  public OperationException setStacktrace(java.lang.String stacktrace) {
+    this.stacktrace = stacktrace;
+    return this;
+  }
+
+  public void unsetStacktrace() {
+    this.stacktrace = null;
+  }
+
+  /** Returns true if field stacktrace is set (has been assigned a value) and false otherwise */
+  public boolean isSetStacktrace() {
+    return this.stacktrace != null;
+  }
+
+  public void setStacktraceIsSet(boolean value) {
+    if (!value) {
+      this.stacktrace = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, java.lang.Object value) {
+    switch (field) {
+    case MESSAGE:
+      if (value == null) {
+        unsetMessage();
+      } else {
+        setMessage((java.lang.String)value);
+      }
+      break;
+
+    case STACKTRACE:
+      if (value == null) {
+        unsetStacktrace();
+      } else {
+        setStacktrace((java.lang.String)value);
+      }
+      break;
+
+    }
+  }
+
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+    case MESSAGE:
+      return getMessage();
+
+    case STACKTRACE:
+      return getStacktrace();
+
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+    case MESSAGE:
+      return isSetMessage();
+    case STACKTRACE:
+      return isSetStacktrace();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof OperationException)
+      return this.equals((OperationException)that);
+    return false;
+  }
+
+  public boolean equals(OperationException that) {
+    if (that == null)
+      return false;
+    if (this == that)
+      return true;
+
+    boolean this_present_message = true && this.isSetMessage();
+    boolean that_present_message = true && that.isSetMessage();
+    if (this_present_message || that_present_message) {
+      if (!(this_present_message && that_present_message))
+        return false;
+      if (!this.message.equals(that.message))
+        return false;
+    }
+
+    boolean this_present_stacktrace = true && this.isSetStacktrace();
+    boolean that_present_stacktrace = true && that.isSetStacktrace();
+    if (this_present_stacktrace || that_present_stacktrace) {
+      if (!(this_present_stacktrace && that_present_stacktrace))
+        return false;
+      if (!this.stacktrace.equals(that.stacktrace))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + ((isSetMessage()) ? 131071 : 524287);
+    if (isSetMessage())
+      hashCode = hashCode * 8191 + message.hashCode();
+
+    hashCode = hashCode * 8191 + ((isSetStacktrace()) ? 131071 : 524287);
+    if (isSetStacktrace())
+      hashCode = hashCode * 8191 + stacktrace.hashCode();
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(OperationException other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.valueOf(isSetMessage()).compareTo(other.isSetMessage());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMessage()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.message, other.message);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetStacktrace()).compareTo(other.isSetStacktrace());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetStacktrace()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.stacktrace, other.stacktrace);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("OperationException(");
+    boolean first = true;
+
+    sb.append("message:");
+    if (this.message == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.message);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("stacktrace:");
+    if (this.stacktrace == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.stacktrace);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class OperationExceptionStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public OperationExceptionStandardScheme getScheme() {
+      return new OperationExceptionStandardScheme();
+    }
+  }
+
+  private static class OperationExceptionStandardScheme extends org.apache.thrift.scheme.StandardScheme<OperationException> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, OperationException struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // MESSAGE
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.message = iprot.readString();
+              struct.setMessageIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // STACKTRACE
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.stacktrace = iprot.readString();
+              struct.setStacktraceIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, OperationException struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.message != null) {
+        oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
+        oprot.writeString(struct.message);
+        oprot.writeFieldEnd();
+      }
+      if (struct.stacktrace != null) {
+        oprot.writeFieldBegin(STACKTRACE_FIELD_DESC);
+        oprot.writeString(struct.stacktrace);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class OperationExceptionTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public OperationExceptionTupleScheme getScheme() {
+      return new OperationExceptionTupleScheme();
+    }
+  }
+
+  private static class OperationExceptionTupleScheme extends org.apache.thrift.scheme.TupleScheme<OperationException> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, OperationException struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet optionals = new java.util.BitSet();
+      if (struct.isSetMessage()) {
+        optionals.set(0);
+      }
+      if (struct.isSetStacktrace()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetMessage()) {
+        oprot.writeString(struct.message);
+      }
+      if (struct.isSetStacktrace()) {
+        oprot.writeString(struct.stacktrace);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, OperationException struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet incoming = iprot.readBitSet(2);
+      if (incoming.get(0)) {
+        struct.message = iprot.readString();
+        struct.setMessageIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.stacktrace = iprot.readString();
+        struct.setStacktraceIsSet(true);
+      }
+    }
+  }
+
+  private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+    return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+}
+
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java
new file mode 100644
index 0000000..467daac
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java
@@ -0,0 +1,1072 @@
+/**
+ * Autogenerated by Thrift Compiler (0.10.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.agents.thrift.stubs;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)", date = "2017-12-02")
+public class OperationService {
+
+  public interface Iface {
+
+    public void executeCommand(java.lang.String command, long callbackWorkflowId) throws OperationException, org.apache.thrift.TException;
+
+  }
+
+  public interface AsyncIface {
+
+    public void executeCommand(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
+
+  }
+
+  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
+    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
+      public Factory() {}
+      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+        return new Client(prot);
+      }
+      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+        return new Client(iprot, oprot);
+      }
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol prot)
+    {
+      super(prot, prot);
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+      super(iprot, oprot);
+    }
+
+    public void executeCommand(java.lang.String command, long callbackWorkflowId) throws OperationException, org.apache.thrift.TException
+    {
+      send_executeCommand(command, callbackWorkflowId);
+      recv_executeCommand();
+    }
+
+    public void send_executeCommand(java.lang.String command, long callbackWorkflowId) throws org.apache.thrift.TException
+    {
+      executeCommand_args args = new executeCommand_args();
+      args.setCommand(command);
+      args.setCallbackWorkflowId(callbackWorkflowId);
+      sendBase("executeCommand", args);
+    }
+
+    public void recv_executeCommand() throws OperationException, org.apache.thrift.TException
+    {
+      executeCommand_result result = new executeCommand_result();
+      receiveBase(result, "executeCommand");
+      if (result.ex != null) {
+        throw result.ex;
+      }
+      return;
+    }
+
+  }
+  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
+    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+      private org.apache.thrift.async.TAsyncClientManager clientManager;
+      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+      public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+        this.clientManager = clientManager;
+        this.protocolFactory = protocolFactory;
+      }
+      public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+        return new AsyncClient(protocolFactory, clientManager, transport);
+      }
+    }
+
+    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
+      super(protocolFactory, clientManager, transport);
+    }
+
+    public void executeCommand(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      executeCommand_call method_call = new executeCommand_call(command, callbackWorkflowId, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class executeCommand_call extends org.apache.thrift.async.TAsyncMethodCall<Void> {
+      private java.lang.String command;
+      private long callbackWorkflowId;
+      public executeCommand_call(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.command = command;
+        this.callbackWorkflowId = callbackWorkflowId;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("executeCommand", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        executeCommand_args args = new executeCommand_args();
+        args.setCommand(command);
+        args.setCallbackWorkflowId(callbackWorkflowId);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public Void getResult() throws OperationException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new java.lang.IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return null;
+      }
+    }
+
+  }
+
+  public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
+    public Processor(I iface) {
+      super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+    }
+
+    protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends Iface> java.util.Map<java.lang.String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
+      processMap.put("executeCommand", new executeCommand());
+      return processMap;
+    }
+
+    public static class executeCommand<I extends Iface> extends org.apache.thrift.ProcessFunction<I, executeCommand_args> {
+      public executeCommand() {
+        super("executeCommand");
+      }
+
+      public executeCommand_args getEmptyArgsInstance() {
+        return new executeCommand_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public executeCommand_result getResult(I iface, executeCommand_args args) throws org.apache.thrift.TException {
+        executeCommand_result result = new executeCommand_result();
+        try {
+          iface.executeCommand(args.command, args.callbackWorkflowId);
+        } catch (OperationException ex) {
+          result.ex = ex;
+        }
+        return result;
+      }
+    }
+
+  }
+
+  public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
+    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
+    public AsyncProcessor(I iface) {
+      super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
+    }
+
+    protected AsyncProcessor(I iface, java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends AsyncIface> java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {
+      processMap.put("executeCommand", new executeCommand());
+      return processMap;
+    }
+
+    public static class executeCommand<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, executeCommand_args, Void> {
+      public executeCommand() {
+        super("executeCommand");
+      }
+
+      public executeCommand_args getEmptyArgsInstance() {
+        return new executeCommand_args();
+      }
+
+      public org.apache.thrift.async.AsyncMethodCallback<Void> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new org.apache.thrift.async.AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            executeCommand_result result = new executeCommand_result();
+            try {
+              fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+            } catch (org.apache.thrift.transport.TTransportException e) {
+              _LOGGER.error("TTransportException writing to internal frame buffer", e);
+              fb.close();
+            } catch (java.lang.Exception e) {
+              _LOGGER.error("Exception writing to internal frame buffer", e);
+              onError(e);
+            }
+          }
+          public void onError(java.lang.Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TSerializable msg;
+            executeCommand_result result = new executeCommand_result();
+            if (e instanceof OperationException) {
+              result.ex = (OperationException) e;
+              result.setExIsSet(true);
+              msg = result;
+            } else if (e instanceof org.apache.thrift.transport.TTransportException) {
+              _LOGGER.error("TTransportException inside handler", e);
+              fb.close();
+              return;
+            } else if (e instanceof org.apache.thrift.TApplicationException) {
+              _LOGGER.error("TApplicationException inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TApplicationException)e;
+            } else {
+              _LOGGER.error("Exception inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+            } catch (java.lang.Exception ex) {
+              _LOGGER.error("Exception writing to internal frame buffer", ex);
+              fb.close();
+            }
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, executeCommand_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+        iface.executeCommand(args.command, args.callbackWorkflowId,resultHandler);
+      }
+    }
+
+  }
+
+  public static class executeCommand_args implements org.apache.thrift.TBase<executeCommand_args, executeCommand_args._Fields>, java.io.Serializable, Cloneable, Comparable<executeCommand_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("executeCommand_args");
+
+    private static final org.apache.thrift.protocol.TField COMMAND_FIELD_DESC = new org.apache.thrift.protocol.TField("command", org.apache.thrift.protocol.TType.STRING, (short)1);
+    private static final org.apache.thrift.protocol.TField CALLBACK_WORKFLOW_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackWorkflowId", org.apache.thrift.protocol.TType.I64, (short)2);
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new executeCommand_argsStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new executeCommand_argsTupleSchemeFactory();
+
+    public java.lang.String command; // required
+    public long callbackWorkflowId; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      COMMAND((short)1, "command"),
+      CALLBACK_WORKFLOW_ID((short)2, "callbackWorkflowId");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // COMMAND
+            return COMMAND;
+          case 2: // CALLBACK_WORKFLOW_ID
+            return CALLBACK_WORKFLOW_ID;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __CALLBACKWORKFLOWID_ISSET_ID = 0;
+    private byte __isset_bitfield = 0;
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.COMMAND, new org.apache.thrift.meta_data.FieldMetaData("command", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.CALLBACK_WORKFLOW_ID, new org.apache.thrift.meta_data.FieldMetaData("callbackWorkflowId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(executeCommand_args.class, metaDataMap);
+    }
+
+    public executeCommand_args() {
+    }
+
+    public executeCommand_args(
+      java.lang.String command,
+      long callbackWorkflowId)
+    {
+      this();
+      this.command = command;
+      this.callbackWorkflowId = callbackWorkflowId;
+      setCallbackWorkflowIdIsSet(true);
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public executeCommand_args(executeCommand_args other) {
+      __isset_bitfield = other.__isset_bitfield;
+      if (other.isSetCommand()) {
+        this.command = other.command;
+      }
+      this.callbackWorkflowId = other.callbackWorkflowId;
+    }
+
+    public executeCommand_args deepCopy() {
+      return new executeCommand_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.command = null;
+      setCallbackWorkflowIdIsSet(false);
+      this.callbackWorkflowId = 0;
+    }
+
+    public java.lang.String getCommand() {
+      return this.command;
+    }
+
+    public executeCommand_args setCommand(java.lang.String command) {
+      this.command = command;
+      return this;
+    }
+
+    public void unsetCommand() {
+      this.command = null;
+    }
+
+    /** Returns true if field command is set (has been assigned a value) and false otherwise */
+    public boolean isSetCommand() {
+      return this.command != null;
+    }
+
+    public void setCommandIsSet(boolean value) {
+      if (!value) {
+        this.command = null;
+      }
+    }
+
+    public long getCallbackWorkflowId() {
+      return this.callbackWorkflowId;
+    }
+
+    public executeCommand_args setCallbackWorkflowId(long callbackWorkflowId) {
+      this.callbackWorkflowId = callbackWorkflowId;
+      setCallbackWorkflowIdIsSet(true);
+      return this;
+    }
+
+    public void unsetCallbackWorkflowId() {
+      __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID);
+    }
+
+    /** Returns true if field callbackWorkflowId is set (has been assigned a value) and false otherwise */
+    public boolean isSetCallbackWorkflowId() {
+      return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID);
+    }
+
+    public void setCallbackWorkflowIdIsSet(boolean value) {
+      __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID, value);
+    }
+
+    public void setFieldValue(_Fields field, java.lang.Object value) {
+      switch (field) {
+      case COMMAND:
+        if (value == null) {
+          unsetCommand();
+        } else {
+          setCommand((java.lang.String)value);
+        }
+        break;
+
+      case CALLBACK_WORKFLOW_ID:
+        if (value == null) {
+          unsetCallbackWorkflowId();
+        } else {
+          setCallbackWorkflowId((java.lang.Long)value);
+        }
+        break;
+
+      }
+    }
+
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case COMMAND:
+        return getCommand();
+
+      case CALLBACK_WORKFLOW_ID:
+        return getCallbackWorkflowId();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case COMMAND:
+        return isSetCommand();
+      case CALLBACK_WORKFLOW_ID:
+        return isSetCallbackWorkflowId();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof executeCommand_args)
+        return this.equals((executeCommand_args)that);
+      return false;
+    }
+
+    public boolean equals(executeCommand_args that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_command = true && this.isSetCommand();
+      boolean that_present_command = true && that.isSetCommand();
+      if (this_present_command || that_present_command) {
+        if (!(this_present_command && that_present_command))
+          return false;
+        if (!this.command.equals(that.command))
+          return false;
+      }
+
+      boolean this_present_callbackWorkflowId = true;
+      boolean that_present_callbackWorkflowId = true;
+      if (this_present_callbackWorkflowId || that_present_callbackWorkflowId) {
+        if (!(this_present_callbackWorkflowId && that_present_callbackWorkflowId))
+          return false;
+        if (this.callbackWorkflowId != that.callbackWorkflowId)
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((isSetCommand()) ? 131071 : 524287);
+      if (isSetCommand())
+        hashCode = hashCode * 8191 + command.hashCode();
+
+      hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(callbackWorkflowId);
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(executeCommand_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = java.lang.Boolean.valueOf(isSetCommand()).compareTo(other.isSetCommand());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCommand()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.command, other.command);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = java.lang.Boolean.valueOf(isSetCallbackWorkflowId()).compareTo(other.isSetCallbackWorkflowId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCallbackWorkflowId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackWorkflowId, other.callbackWorkflowId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+    }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("executeCommand_args(");
+      boolean first = true;
+
+      sb.append("command:");
+      if (this.command == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.command);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("callbackWorkflowId:");
+      sb.append(this.callbackWorkflowId);
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bitfield = 0;
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class executeCommand_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public executeCommand_argsStandardScheme getScheme() {
+        return new executeCommand_argsStandardScheme();
+      }
+    }
+
+    private static class executeCommand_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<executeCommand_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, executeCommand_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // COMMAND
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.command = iprot.readString();
+                struct.setCommandIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // CALLBACK_WORKFLOW_ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+                struct.callbackWorkflowId = iprot.readI64();
+                struct.setCallbackWorkflowIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, executeCommand_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.command != null) {
+          oprot.writeFieldBegin(COMMAND_FIELD_DESC);
+          oprot.writeString(struct.command);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldBegin(CALLBACK_WORKFLOW_ID_FIELD_DESC);
+        oprot.writeI64(struct.callbackWorkflowId);
+        oprot.writeFieldEnd();
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class executeCommand_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public executeCommand_argsTupleScheme getScheme() {
+        return new executeCommand_argsTupleScheme();
+      }
+    }
+
+    private static class executeCommand_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<executeCommand_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, executeCommand_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.isSetCommand()) {
+          optionals.set(0);
+        }
+        if (struct.isSetCallbackWorkflowId()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetCommand()) {
+          oprot.writeString(struct.command);
+        }
+        if (struct.isSetCallbackWorkflowId()) {
+          oprot.writeI64(struct.callbackWorkflowId);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, executeCommand_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          struct.command = iprot.readString();
+          struct.setCommandIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.callbackWorkflowId = iprot.readI64();
+          struct.setCallbackWorkflowIdIsSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+  public static class executeCommand_result implements org.apache.thrift.TBase<executeCommand_result, executeCommand_result._Fields>, java.io.Serializable, Cloneable, Comparable<executeCommand_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("executeCommand_result");
+
+    private static final org.apache.thrift.protocol.TField EX_FIELD_DESC = new org.apache.thrift.protocol.TField("ex", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new executeCommand_resultStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new executeCommand_resultTupleSchemeFactory();
+
+    public OperationException ex; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      EX((short)1, "ex");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // EX
+            return EX;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.EX, new org.apache.thrift.meta_data.FieldMetaData("ex", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OperationException.class)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(executeCommand_result.class, metaDataMap);
+    }
+
+    public executeCommand_result() {
+    }
+
+    public executeCommand_result(
+      OperationException ex)
+    {
+      this();
+      this.ex = ex;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public executeCommand_result(executeCommand_result other) {
+      if (other.isSetEx()) {
+        this.ex = new OperationException(other.ex);
+      }
+    }
+
+    public executeCommand_result deepCopy() {
+      return new executeCommand_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.ex = null;
+    }
+
+    public OperationException getEx() {
+      return this.ex;
+    }
+
+    public executeCommand_result setEx(OperationException ex) {
+      this.ex = ex;
+      return this;
+    }
+
+    public void unsetEx() {
+      this.ex = null;
+    }
+
+    /** Returns true if field ex is set (has been assigned a value) and false otherwise */
+    public boolean isSetEx() {
+      return this.ex != null;
+    }
+
+    public void setExIsSet(boolean value) {
+      if (!value) {
+        this.ex = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, java.lang.Object value) {
+      switch (field) {
+      case EX:
+        if (value == null) {
+          unsetEx();
+        } else {
+          setEx((OperationException)value);
+        }
+        break;
+
+      }
+    }
+
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case EX:
+        return getEx();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case EX:
+        return isSetEx();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof executeCommand_result)
+        return this.equals((executeCommand_result)that);
+      return false;
+    }
+
+    public boolean equals(executeCommand_result that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_ex = true && this.isSetEx();
+      boolean that_present_ex = true && that.isSetEx();
+      if (this_present_ex || that_present_ex) {
+        if (!(this_present_ex && that_present_ex))
+          return false;
+        if (!this.ex.equals(that.ex))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((isSetEx()) ? 131071 : 524287);
+      if (isSetEx())
+        hashCode = hashCode * 8191 + ex.hashCode();
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(executeCommand_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = java.lang.Boolean.valueOf(isSetEx()).compareTo(other.isSetEx());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetEx()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.ex, other.ex);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+      }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new java.lang.StringBuilder("executeCommand_result(");
+      boolean first = true;
+
+      sb.append("ex:");
+      if (this.ex == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.ex);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class executeCommand_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public executeCommand_resultStandardScheme getScheme() {
+        return new executeCommand_resultStandardScheme();
+      }
+    }
+
+    private static class executeCommand_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<executeCommand_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, executeCommand_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // EX
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.ex = new OperationException();
+                struct.ex.read(iprot);
+                struct.setExIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, executeCommand_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.ex != null) {
+          oprot.writeFieldBegin(EX_FIELD_DESC);
+          struct.ex.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class executeCommand_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+      public executeCommand_resultTupleScheme getScheme() {
+        return new executeCommand_resultTupleScheme();
+      }
+    }
+
+    private static class executeCommand_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<executeCommand_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, executeCommand_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.isSetEx()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetEx()) {
+          struct.ex.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, executeCommand_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.ex = new OperationException();
+          struct.ex.read(iprot);
+          struct.setExIsSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties
new file mode 100644
index 0000000..bd5b373
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties
@@ -0,0 +1,2 @@
+kafka.bootstrap.url=localhost:9092
+async.event.listener.topic=async-event-listener
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift
new file mode 100644
index 0000000..9ff22d0
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift
@@ -0,0 +1,11 @@
+namespace java org.apache.airavata.agents.thrift.stubs
+
+exception OperationException {
+  1: string message,
+  2: string stacktrace
+}
+
+service OperationService
+{
+        void executeCommand(1:string command, 2:i64 callbackWorkflowId) throws (1:OperationException ex)
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java
new file mode 100644
index 0000000..25f0962
--- /dev/null
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.k8s.api.resources.process;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class ProcessBootstrapDataResource {
+    private long id;
+    private String key;
+    private String value;
+
+    public long getId() {
+        return id;
+    }
+
+    public ProcessBootstrapDataResource setId(long id) {
+        this.id = id;
+        return this;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public ProcessBootstrapDataResource setKey(String key) {
+        this.key = key;
+        return this;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public ProcessBootstrapDataResource setValue(String value) {
+        this.value = value;
+        return this;
+    }
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
index b5081cf..fe81007 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
@@ -41,6 +41,7 @@ public class ProcessResource {
     private List<ProcessStatusResource> processStatuses = new ArrayList<>();
     private List<TaskResource> tasks = new ArrayList<>();
     private List<Long> processErrorIds = new ArrayList<>();
+    private List<ProcessBootstrapDataResource> processBootstrapData = new ArrayList<>();
     private String taskDag;
     private String experimentDataDir;
     private String processType;
@@ -152,4 +153,13 @@ public class ProcessResource {
         this.workflowId = workflowId;
         return this;
     }
+
+    public List<ProcessBootstrapDataResource> getProcessBootstrapData() {
+        return processBootstrapData;
+    }
+
+    public ProcessResource setProcessBootstrapData(List<ProcessBootstrapDataResource> processBootstrapData) {
+        this.processBootstrapData = processBootstrapData;
+        return this;
+    }
 }
diff --git a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
index 03589e7..a3ccffb 100644
--- a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
@@ -29,9 +29,9 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     public static final String PROCESS_ID = "process_id";
 
     //Configurable values
-    private String apiServerUrl = "localhost:8080";
-    private String kafkaBootstrapUrl = "localhost:9092";
-    private String eventTopic = "airavata-task-event";
+    private String apiServerUrl;
+    private String kafkaBootstrapUrl;
+    private String eventTopic;
 
     private TaskCallbackContext callbackContext;
     private RestTemplate restTemplate;
@@ -39,11 +39,19 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     private long processId;
     private long taskId;
 
-    public AbstractTask(TaskCallbackContext callbackContext) {
+    private PropertyResolver propertyResolver;
+
+    public AbstractTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
         this.callbackContext = callbackContext;
         this.taskId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(TASK_ID));
         this.processId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(PROCESS_ID));
         this.restTemplate = new RestTemplate();
+        this.propertyResolver = propertyResolver;
+
+        this.apiServerUrl = getPropertyResolver().get("api.server.url");
+        this.kafkaBootstrapUrl = getPropertyResolver().get("kafka.bootstrap.url");
+        this.eventTopic = getPropertyResolver().get("event.topic");
+
         initializeKafkaEventProducer();
         init();
     }
@@ -135,4 +143,12 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     public long getTaskId() {
         return taskId;
     }
+
+    public Producer<String, String> getEventProducer() {
+        return eventProducer;
+    }
+
+    public PropertyResolver getPropertyResolver() {
+        return propertyResolver;
+    }
 }
diff --git a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
index a2a56ca..ceb8126 100644
--- a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
@@ -38,12 +38,13 @@ public abstract class HelixParticipant implements Runnable {
     private String taskTypeName;
     private String apiServerUrl;
     private RestTemplate restTemplate;
+    private PropertyResolver propertyResolver;
 
     public HelixParticipant(String propertyFile) throws IOException {
 
         logger.debug("Initializing Participant Node");
 
-        PropertyResolver propertyResolver = new PropertyResolver();
+        this.propertyResolver = new PropertyResolver();
         propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
 
         this.zkAddress = propertyResolver.get("zookeeper.connection.url");
@@ -137,4 +138,8 @@ public abstract class HelixParticipant implements Runnable {
             zkHelixManager.disconnect();
         }
     }
+
+    public PropertyResolver getPropertyResolver() {
+        return propertyResolver;
+    }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
index 273a59f..67b4b2a 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
@@ -9,6 +9,7 @@ import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
 import java.util.List;
+import java.util.Map;
 
 /**
  * TODO: Class level comments please
@@ -40,7 +41,12 @@ public class WorkflowController {
     }
 
     @GetMapping(path = "{id}/launch", produces = MediaType.APPLICATION_JSON_VALUE)
-    public long launchExperiment(@PathVariable("id") long id) {
-        return this.workflowService.launchWorkflow(id);
+    public long launchWorkflow(@PathVariable("id") long id) {
+        return this.workflowService.launchWorkflow(id, null);
+    }
+
+    @PostMapping(path = "{id}/launch", produces = MediaType.APPLICATION_JSON_VALUE)
+    public long launchWorkflow(@PathVariable("id") long id, @RequestBody Map<String, String> boostrapData) {
+        return this.workflowService.launchWorkflow(id, boostrapData);
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java
new file mode 100644
index 0000000..5ed9222
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java
@@ -0,0 +1,63 @@
+package org.apache.airavata.k8s.api.server.model.process;
+
+import javax.persistence.*;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Entity
+@Table(name = "PROCESS_BOOTSTRAP_DATA")
+public class ProcessBootstrapData {
+
+    @Id
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    private long id;
+
+    @ManyToOne
+    private ProcessModel processModel;
+
+    @Column(name = "DATA_KEY")
+    private String key;
+
+    @Column(name = "DATA_VALUE")
+    private String value;
+
+    public long getId() {
+        return id;
+    }
+
+    public ProcessBootstrapData setId(long id) {
+        this.id = id;
+        return this;
+    }
+
+    public ProcessModel getProcessModel() {
+        return processModel;
+    }
+
+    public ProcessBootstrapData setProcessModel(ProcessModel processModel) {
+        this.processModel = processModel;
+        return this;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public ProcessBootstrapData setKey(String key) {
+        this.key = key;
+        return this;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public ProcessBootstrapData setValue(String value) {
+        this.value = value;
+        return this;
+    }
+}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
index 5a4054f..b605b9f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
@@ -60,6 +60,9 @@ public class ProcessModel {
     @OneToMany(mappedBy = "parentProcess", cascade = CascadeType.ALL)
     private List<TaskModel> tasks = new ArrayList<>();
 
+    @OneToMany(mappedBy = "processModel", cascade = CascadeType.ALL)
+    private List<ProcessBootstrapData> processBootstrapData = new ArrayList<>();
+
     private String taskDag;
 
     @OneToMany
@@ -177,6 +180,15 @@ public class ProcessModel {
         return this;
     }
 
+    public List<ProcessBootstrapData> getProcessBootstrapData() {
+        return processBootstrapData;
+    }
+
+    public ProcessModel setProcessBootstrapData(List<ProcessBootstrapData> processBootstrapData) {
+        this.processBootstrapData = processBootstrapData;
+        return this;
+    }
+
     public enum ProcessType {
         WORKFLOW, EXPERIMENT;
     }
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java
new file mode 100644
index 0000000..e79d74e
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.k8s.api.server.repository.process;
+
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface ProcessBootstrapDataRepository extends CrudRepository<ProcessBootstrapData, Long> {
+}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
index 500e83f..bee190f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
@@ -21,9 +21,11 @@ package org.apache.airavata.k8s.api.server.service;
 
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
 import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
 import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
 import org.apache.airavata.k8s.api.server.model.task.TaskModel;
+import org.apache.airavata.k8s.api.server.repository.process.ProcessBootstrapDataRepository;
 import org.apache.airavata.k8s.api.server.repository.process.ProcessRepository;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.server.repository.process.ProcessStatusRepository;
@@ -45,6 +47,7 @@ public class ProcessService {
 
     private ProcessRepository processRepository;
     private ProcessStatusRepository processStatusRepository;
+    private ProcessBootstrapDataRepository bootstrapDataRepository;
 
     private ExperimentService experimentService;
     private TaskService taskService;
@@ -55,13 +58,15 @@ public class ProcessService {
                           ProcessStatusRepository processStatusRepository,
                           ExperimentService experimentService,
                           TaskService taskService,
-                          WorkflowRepository workflowRepository) {
+                          WorkflowRepository workflowRepository,
+                          ProcessBootstrapDataRepository bootstrapDataRepository) {
 
         this.processRepository = processRepository;
         this.processStatusRepository = processStatusRepository;
         this.experimentService = experimentService;
         this.taskService = taskService;
         this.workflowRepository = workflowRepository;
+        this.bootstrapDataRepository = bootstrapDataRepository;
     }
 
     public long create(ProcessResource resource) {
@@ -95,6 +100,14 @@ public class ProcessService {
             taskModel.setId(taskService.create(taskRes));
         }));
 
+        Optional.ofNullable(resource.getProcessBootstrapData()).ifPresent(bootstrapDatas -> bootstrapDatas.forEach(data -> {
+            ProcessBootstrapData bootstrapData = new ProcessBootstrapData();
+            bootstrapData.setProcessModel(saved);
+            bootstrapData.setKey(data.getKey());
+            bootstrapData.setValue(data.getValue());
+            this.bootstrapDataRepository.save(bootstrapData);
+        }));
+
         return saved.getId();
     }
 
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 42be414..d76f5e5 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -1,5 +1,6 @@
 package org.apache.airavata.k8s.api.server.service;
 
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource;
 import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -18,10 +19,7 @@ import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * TODO: Class level comments please
@@ -70,15 +68,22 @@ public class WorkflowService {
         return saved.getId();
     }
 
-    public long launchWorkflow(long id) {
+    public long launchWorkflow(long id, Map<String, String> boostrapData) {
         Workflow workflow = this.workflowRepository.findById(id)
                 .orElseThrow(() -> new ServerRuntimeException("Workflow with id " + id + "can not be found"));
 
+        List<ProcessBootstrapDataResource> bootstrapDataResources = new ArrayList<>();
+
+        if (boostrapData != null) {
+            boostrapData.forEach((key, value) ->
+                    bootstrapDataResources.add(new ProcessBootstrapDataResource().setKey(key).setValue(value)));
+        }
 
         long processId = processService.create(new ProcessResource()
                 .setName("Workflow Process : " + workflow.getName() + "-" + UUID.randomUUID().toString())
                 .setCreationTime(System.currentTimeMillis())
                 .setProcessType("WORKFLOW")
+                .setProcessBootstrapData(bootstrapDataResources)
                 .setWorkflowId(id));
 
         try {
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index 35c7fde..6e61c47 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -20,6 +20,7 @@
 package org.apache.airavata.k8s.api.server.service.util;
 
 import org.apache.airavata.k8s.api.resources.experiment.ExperimentStatusResource;
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
 import org.apache.airavata.k8s.api.resources.task.*;
 import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -35,6 +36,7 @@ import org.apache.airavata.k8s.api.server.model.experiment.Experiment;
 import org.apache.airavata.k8s.api.server.model.experiment.ExperimentInputData;
 import org.apache.airavata.k8s.api.server.model.experiment.ExperimentOutputData;
 import org.apache.airavata.k8s.api.server.model.experiment.ExperimentStatus;
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
 import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
 import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
 import org.apache.airavata.k8s.api.server.model.task.*;
@@ -325,12 +327,27 @@ public class ToResourceUtil {
                     .ifPresent(tasks -> tasks.forEach(task -> processResource.getTasks().add(toResource(task).get())));
             Optional.ofNullable(processModel.getProcessErrors())
                     .ifPresent(errs -> errs.forEach(err -> processResource.getProcessErrorIds().add(err.getId())));
+            Optional.ofNullable(processModel.getProcessBootstrapData())
+                    .ifPresent(datas -> datas.forEach(data -> processResource.getProcessBootstrapData().add(toResource(data).get())));
+
             return Optional.of(processResource);
         } else {
             return Optional.empty();
         }
     }
 
+    public static Optional<ProcessBootstrapDataResource> toResource(ProcessBootstrapData bootstrapData) {
+        if (bootstrapData != null) {
+            ProcessBootstrapDataResource resource = new ProcessBootstrapDataResource();
+            resource.setId(bootstrapData.getId());
+            resource.setKey(bootstrapData.getKey());
+            resource.setValue(bootstrapData.getValue());
+            return Optional.of(resource);
+        } else {
+            return Optional.empty();
+        }
+    }
+
     public static Optional<ProcessStatusResource> toResource(ProcessStatus processStatus) {
         if (processStatus != null) {
             ProcessStatusResource resource = new ProcessStatusResource();
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml b/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml
new file mode 100644
index 0000000..7afdbf4
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-kubernetes</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>async-event-listener</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>api-resource</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-freemarker</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>jar</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <artifact-packaging>jar</artifact-packaging>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.springframework.boot</groupId>
+                        <artifactId>spring-boot-maven-plugin</artifactId>
+                        <version>1.4.3.RELEASE</version>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>repackage</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <!-- Create a docker image that runs the executable jar-->
+                    <plugin>
+                        <groupId>com.spotify</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>1.0.0</version>
+                        <configuration>
+                            <imageName>${docker.image.prefix}/task-scheduler</imageName>
+                            <baseImage>java:openjdk-8-jdk-alpine</baseImage>
+                            <entryPoint>["java","-jar","/${project.build.finalName}.jar"]</entryPoint>
+                            <resources>
+                                <resource>
+                                    <targetPath>/</targetPath>
+                                    <directory>${project.build.directory}</directory>
+                                    <include>${project.build.finalName}.jar</include>
+                                </resource>
+                            </resources>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>build</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+        <profile>
+            <id>war</id>
+            <properties>
+                <artifact-packaging>war</artifact-packaging>
+            </properties>
+            <dependencies>
+                <dependency>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-tomcat</artifactId>
+                    <scope>provided</scope>
+                </dependency>
+            </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.springframework.boot</groupId>
+                        <artifactId>spring-boot-maven-plugin</artifactId>
+                        <version>1.4.3.RELEASE</version>
+                        <configuration>
+                            <!-- this will get rid of version info from war file name -->
+                            <finalName>task-scheduler</finalName>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java
new file mode 100644
index 0000000..aa9b5d7
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.async.event.listener;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@SpringBootApplication
+@Configuration
+@ComponentScan
+public class Application {
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+
+    @Bean
+    public RestTemplate restTemplate(RestTemplateBuilder builder) {
+        return builder.build();
+    }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java
new file mode 100644
index 0000000..c864ae2
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.async.event.listener.messaging;
+
+import org.apache.airavata.async.event.listener.service.ListenerService;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import javax.annotation.Resource;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class KafkaReceiver {
+
+    @Resource
+    private ListenerService listenerService;
+
+    @KafkaListener(topics = "${listener.topic.name}", containerFactory = "kafkaListenerContainerFactory")
+    public void receiveProcesses(String payload) {
+        System.out.println("received process=" + payload);
+        long workflowId = Long.parseLong(payload.split(",")[0]);
+        String event = payload.split(",")[1];
+        String message = payload.split(",")[2];
+        listenerService.onEventReceived(workflowId, event, message);
+    }
+//
+//    @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
+//    public void receiveTaskEvent(TaskContext taskContext) {
+//        System.out.println("received event for task id =" + taskContext.getTaskId());
+//        workerService.onTaskStateEvent(taskContext);
+//    }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java
new file mode 100644
index 0000000..2a59c86
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.async.event.listener.messaging;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+    @Value("${kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${listener.group.name}")
+    private String listenerGroupName;
+
+    @Bean
+    public Map<String, Object> consumerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        // allows a pool of processes to divide the work of consuming and processing records
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, listenerGroupName);
+        return props;
+    }
+
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory() {
+        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
+    }
+
+    @Bean
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> factory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        return factory;
+    }
+
+    @Bean
+    public KafkaReceiver receiver() {
+        return new KafkaReceiver();
+    }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java
new file mode 100644
index 0000000..72f5309
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java
@@ -0,0 +1,35 @@
+package org.apache.airavata.async.event.listener.service;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Service
+public class ListenerService {
+
+    private final RestTemplate restTemplate;
+
+    @Value("${api.server.url}")
+    private String apiServerUrl;
+
+    public ListenerService(RestTemplate restTemplate) {
+        this.restTemplate = restTemplate;
+    }
+
+    public void onEventReceived(long workflowId, String event, String message) {
+        Map<String, String> boostrapData = new HashMap<>();
+        boostrapData.put("event", event);
+        boostrapData.put("message", message);
+        this.restTemplate.postForObject(apiServerUrl + "/workflow/" + workflowId + "/launch", boostrapData, Long.class);
+    }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties
new file mode 100644
index 0000000..f4f3812
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties
@@ -0,0 +1,4 @@
+server.port = 9195
+listener.topic.name=async-event-listener
+listener.group.name=async-event-listener
+api.server.url=api-server.default.svc.cluster.local:8080
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml
new file mode 100644
index 0000000..069dd61
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml
@@ -0,0 +1,4 @@
+kafka:
+  bootstrap-servers: kafka.default.svc.cluster.local:9092
+  topic:
+    helloworld: helloworld.t
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml
new file mode 100644
index 0000000..a3a7669
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-kubernetes</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>async-command-task</artifactId>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.airavata.helix.task.async.command.Participant</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- Create a docker image that runs the executable jar-->
+            <plugin>
+                <groupId>com.spotify</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+                <version>1.0.0</version>
+                <configuration>
+                    <imageName>${docker.image.prefix}/async-command-task</imageName>
+                    <baseImage>java:openjdk-8-jdk-alpine</baseImage>
+                    <exposes>
+                        <expose>8080</expose>
+                    </exposes>
+                    <entryPoint>["java","-jar","/${project.build.finalName}-jar-with-dependencies.jar"]</entryPoint>
+                    <resources>
+                        <resource>
+                            <targetPath>/</targetPath>
+                            <directory>${project.build.directory}</directory>
+                            <include>${project.build.finalName}-jar-with-dependencies.jar</include>
+                        </resource>
+                    </resources>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>helix-task-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-core</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>thrift-agent</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>api-resource</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
similarity index 68%
copy from airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
copy to airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
index 866c2c7..5ece6f2 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
@@ -1,15 +1,18 @@
-package org.apache.airavata.helix.task.command;
+package org.apache.airavata.helix.task.async.command;
 
+import org.apache.airavata.agents.core.AsyncOperation;
 import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.compute.api.ExecutionResult;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskResult;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 
 /**
@@ -18,9 +21,9 @@ import java.util.Arrays;
  * @author dimuthu
  * @since 1.0.0-SNAPSHOT
  */
-public class CommandTask extends AbstractTask {
+public class AsyncCommandTask extends AbstractTask {
 
-    public static final String NAME = "COMMAND";
+    public static final String NAME = "ASYNC_COMMAND_TASK";
 
     private String command;
     private String arguments;
@@ -28,9 +31,10 @@ public class CommandTask extends AbstractTask {
     private String stdErrPath;
     private String computeResourceId;
     private ComputeResource computeResource;
+    private Long callBackWorkflowId;
 
-    public CommandTask(TaskCallbackContext callbackContext) {
-        super(callbackContext);
+    public AsyncCommandTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+        super(callbackContext, propertyResolver);
     }
 
     @Override
@@ -40,48 +44,28 @@ public class CommandTask extends AbstractTask {
         this.stdOutPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_OUT_PATH);
         this.stdErrPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_ERR_PATH);
         this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+        this.callBackWorkflowId = Long.parseLong(getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.CALLBACK_WORKFLOW));
         this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
                 + "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+
     }
 
+    @Override
     public TaskResult onRun() {
-        System.out.println("Executing command " + command);
         try {
-
-            String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
-
-            publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
-
-            String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
-
-            System.out.println("Executing command " + finalCommand);
-            Thread.sleep(200000);
-            ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
-
-            if (executionResult.getExitStatus() == 0) {
-                publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
-                sendToOutPort("Out");
-                return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
-
-            } else if (executionResult.getExitStatus() == -1) {
-                publishTaskStatus(TaskStatusResource.State.FAILED, "Process didn't exit successfully");
-                sendToOutPort("Error");
-                return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
-
-            } else {
-                publishTaskStatus(TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
-                sendToOutPort("Error");
-                return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
-            }
-
-        } catch (Exception e) {
-
+            AsyncOperation operation = (AsyncOperation) Class.forName("org.apache.airavata.agents.thrift.operation.ThriftAgentOperation")
+                    .getConstructor(ComputeResource.class).newInstance(this.computeResource);
+            operation.executeCommandAsync(this.command, this.callBackWorkflowId);
+            return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+        } catch (InstantiationException | IllegalAccessException |
+                InvocationTargetException | ClassNotFoundException | NoSuchMethodException | ClassCastException e) {
             e.printStackTrace();
-            publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+            publishTaskStatus(TaskStatusResource.State.FAILED, "Failed to load async operation");
             return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
         }
     }
 
+    @Override
     public void onCancel() {
 
     }
@@ -89,7 +73,6 @@ public class CommandTask extends AbstractTask {
     public static TaskTypeResource getTaskType() {
         TaskTypeResource taskTypeResource = new TaskTypeResource();
         taskTypeResource.setName(NAME);
-        taskTypeResource.setTopicName("airavata-command");
         taskTypeResource.setIcon("assets/icons/ssh.png");
         taskTypeResource.getInputTypes().addAll(
                 Arrays.asList(
@@ -106,6 +89,10 @@ public class CommandTask extends AbstractTask {
                                 .setType("Long")
                                 .setDefaultValue(""),
                         new TaskInputTypeResource()
+                                .setName(PARAMS.CALLBACK_WORKFLOW)
+                                .setType("Long")
+                                .setDefaultValue(""),
+                        new TaskInputTypeResource()
                                 .setName(PARAMS.STD_OUT_PATH)
                                 .setType("String")
                                 .setDefaultValue(""),
@@ -114,6 +101,7 @@ public class CommandTask extends AbstractTask {
                                 .setType("String")
                                 .setDefaultValue("")));
 
+
         taskTypeResource.getOutPorts().addAll(
                 Arrays.asList(
                         new TaskOutPortTypeResource()
@@ -133,5 +121,6 @@ public class CommandTask extends AbstractTask {
         public static final String STD_OUT_PATH = "std_out_path";
         public static final String STD_ERR_PATH = "std_err_path";
         public static final String COMPUTE_RESOURCE = "compute_resource";
+        public static final String CALLBACK_WORKFLOW = "callback_workflow";
     }
 }
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
similarity index 84%
copy from airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
copy to airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
index 3b7e55e..57bb26c 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.helix.task.command;
+package org.apache.airavata.helix.task.async.command;
 
 import org.apache.airavata.helix.api.HelixParticipant;
 import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
@@ -18,7 +18,6 @@ import java.util.Map;
  */
 public class Participant extends HelixParticipant {
 
-
     public Participant(String propertyFile) throws IOException {
         super(propertyFile);
     }
@@ -30,18 +29,18 @@ public class Participant extends HelixParticipant {
         TaskFactory commandTaskFac = new TaskFactory() {
             @Override
             public Task createNewTask(TaskCallbackContext context) {
-                return new CommandTask(context);
+                return new AsyncCommandTask(context, getPropertyResolver());
             }
         };
 
-        taskRegistry.put(CommandTask.NAME, commandTaskFac);
+        taskRegistry.put(AsyncCommandTask.NAME, commandTaskFac);
 
         return taskRegistry;
     }
 
     @Override
     public TaskTypeResource getTaskType() {
-        return CommandTask.getTaskType();
+        return AsyncCommandTask.getTaskType();
     }
 
     public static void main(String args[]) {
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties
new file mode 100644
index 0000000..78ef57d
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties
@@ -0,0 +1,8 @@
+api.server.url=api-server.default.svc.cluster.local:8080
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+participant.name=async-command-p1
+task.type.name=ASYNC_COMMAND_TASK
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
+async.event.listener.topic=async-event-listener
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
index 866c2c7..d5b8bda 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.helix.task.command;
 
 import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -29,8 +30,8 @@ public class CommandTask extends AbstractTask {
     private String computeResourceId;
     private ComputeResource computeResource;
 
-    public CommandTask(TaskCallbackContext callbackContext) {
-        super(callbackContext);
+    public CommandTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+        super(callbackContext, propertyResolver);
     }
 
     @Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
index 3b7e55e..289135d 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
@@ -30,7 +30,7 @@ public class Participant extends HelixParticipant {
         TaskFactory commandTaskFac = new TaskFactory() {
             @Override
             public Task createNewTask(TaskCallbackContext context) {
-                return new CommandTask(context);
+                return new CommandTask(context, getPropertyResolver());
             }
         };
 
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
index 92292ab..73a9b9c 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
@@ -3,3 +3,5 @@ zookeeper.connection.url=localhost:2199
 helix.cluster.name=AiravataDemoCluster
 participant.name=command-p2
 task.type.name=COMMAND
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
index 00aeadc..cc6af6b 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.helix.task.datain;
 
 import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -30,8 +31,8 @@ public class DataInputTask extends AbstractTask {
     private String computeResourceId;
     private ComputeResource computeResource;
 
-    public DataInputTask(TaskCallbackContext callbackContext) {
-        super(callbackContext);
+    public DataInputTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+        super(callbackContext, propertyResolver);
     }
 
     @Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
index d804352..498dfee 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
@@ -30,7 +30,7 @@ public class Participant extends HelixParticipant {
         TaskFactory dataInTask = new TaskFactory() {
             @Override
             public Task createNewTask(TaskCallbackContext context) {
-                return new DataInputTask(context);
+                return new DataInputTask(context, getPropertyResolver());
             }
         };
 
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
index 3f98946..acf5a9b 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
@@ -2,4 +2,6 @@ api.server.url=api-server.default.svc.cluster.local:8080
 zookeeper.connection.url=localhost:2199
 helix.cluster.name=AiravataDemoCluster
 participant.name=data-in-p1
-task.type.name=DATA_INPUT
\ No newline at end of file
+task.type.name=DATA_INPUT
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
index ad6123f..0fe8fbe 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.helix.task.dataout;
 
 import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
 import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
 import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -33,8 +34,8 @@ public class DataOutputTask extends AbstractTask {
     private String computeResourceId;
     private ComputeResource computeResource;
 
-    public DataOutputTask(TaskCallbackContext callbackContext) {
-        super(callbackContext);
+    public DataOutputTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+        super(callbackContext, propertyResolver);
     }
 
     @Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
index a26c654..1fe9fe6 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
@@ -29,7 +29,7 @@ public class Participant extends HelixParticipant {
         TaskFactory dataInTask = new TaskFactory() {
             @Override
             public Task createNewTask(TaskCallbackContext context) {
-                return new DataOutputTask(context);
+                return new DataOutputTask(context, getPropertyResolver());
             }
         };
 
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
index 37109d6..2e10b26 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
@@ -2,4 +2,6 @@ api.server.url=api-server.default.svc.cluster.local:8080
 zookeeper.connection.url=localhost:2199
 helix.cluster.name=AiravataDemoCluster
 participant.name=data-out-p1
-task.type.name=DATA_OUTPUT
\ No newline at end of file
+task.type.name=DATA_OUTPUT
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
index 4fe8579..37ca403 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
@@ -1,5 +1,6 @@
 package org.apache.airavata.k8s.gfac.core;
 
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
 import org.apache.helix.HelixManagerFactory;
@@ -26,6 +27,7 @@ public class HelixWorkflowManager {
 
     private long processId;
     private List<TaskResource> tasks;
+    private List<ProcessBootstrapDataResource> boostrapData;
 
     // out port id, next task id
     private Map<Long, Long> edgeMap;
@@ -38,9 +40,9 @@ public class HelixWorkflowManager {
     private String helixClusterName;
     private String instanceName;
 
-    public HelixWorkflowManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
-                                RestTemplate restTemplate, String apiServerUrl, String zkConnectionString,
-                                String helixClusterName, String instanceName) {
+    public HelixWorkflowManager(long processId, List<TaskResource> tasks, List<ProcessBootstrapDataResource> boostrapData,
+                                Map<Long, Long> edgeMap, RestTemplate restTemplate, String apiServerUrl,
+                                String zkConnectionString, String helixClusterName, String instanceName) {
         this.processId = processId;
         this.tasks = tasks;
         this.edgeMap = edgeMap;
@@ -49,6 +51,7 @@ public class HelixWorkflowManager {
         this.zkConnectionString = zkConnectionString;
         this.helixClusterName = helixClusterName;
         this.instanceName = instanceName;
+        this.boostrapData = boostrapData;
     }
 
     public void launchWorkflow() {
@@ -57,7 +60,7 @@ public class HelixWorkflowManager {
 
         try {
             updateProcessStatus(ProcessStatusResource.State.CREATED);
-            Workflow.Builder workflowBuilder = createWorkflow();
+            Workflow.Builder workflowBuilder = createWorkflow(this.boostrapData);
             WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
             workflowBuilder.setWorkflowConfig(config.build());
             if (workflowBuilder == null) {
@@ -94,11 +97,11 @@ public class HelixWorkflowManager {
         }
     }
 
-    private Workflow.Builder createWorkflow() {
+    private Workflow.Builder createWorkflow(List<ProcessBootstrapDataResource> bootstrapData) {
         Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
         if (startingTask.isPresent()) {
             Workflow.Builder workflow = new Workflow.Builder("Airavata_Process_" + processId).setExpiry(0);
-            createWorkflowRecursively(startingTask.get(), workflow, null);
+            createWorkflowRecursively(startingTask.get(), workflow, null, bootstrapData);
             return workflow;
         } else {
             System.out.println("No starting task for this process " + processId);
@@ -107,7 +110,8 @@ public class HelixWorkflowManager {
         }
     }
 
-    private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId) {
+    private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId,
+                                           List<ProcessBootstrapDataResource> boostrapData) {
 
         TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + taskResource.getId())
                 .setCommand(taskResource.getTaskType().getName());
@@ -119,6 +123,10 @@ public class HelixWorkflowManager {
         taskBuilder.addConfig("task_id", taskResource.getId() + "");
         taskBuilder.addConfig("process_id", taskResource.getParentProcessId() + "");
 
+        Optional.ofNullable(boostrapData).ifPresent(data -> {
+            data.forEach(d -> taskBuilder.addConfig(d.getKey(), d.getValue()));
+        });
+
         Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
             Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
                 Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
@@ -148,7 +156,7 @@ public class HelixWorkflowManager {
                 Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
                 nextTaskResource.ifPresent(t -> {
 
-                    createWorkflowRecursively(t, workflow, taskResource.getId());
+                    createWorkflowRecursively(t, workflow, taskResource.getId(), null);
                 });
             });
         }));
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 888f469..2ea84de 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -85,7 +85,8 @@ public class WorkerService {
 
         //processLifecycleStore.put(processId, manager);
 
-        final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources, edgeMap,
+        final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources,
+                processResource.getProcessBootstrapData(), edgeMap,
                 restTemplate, apiServerUrl,
                 zkConnectionString, helixClusterName, instanceName);
 
diff --git a/airavata-kubernetes/pom.xml b/airavata-kubernetes/pom.xml
index 30b9226..9ca584f 100644
--- a/airavata-kubernetes/pom.xml
+++ b/airavata-kubernetes/pom.xml
@@ -41,6 +41,10 @@
         <module>modules/microservices/tasks/command-task</module>
         <module>modules/microservices/tasks/data-in-task</module>
         <module>modules/microservices/tasks/data-out-task</module>
+        <module>modules/agents/agent-core</module>
+        <module>modules/agents/thrift-agent</module>
+        <module>modules/microservices/tasks/async-command-task</module>
+        <module>modules/microservices/async-event-listener</module>
     </modules>
 
     <dependencyManagement>

-- 
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.