You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/12/06 03:14:02 UTC
[airavata-sandbox] 18/19: Initial agent implementation
This is an automated email from the ASF dual-hosted git repository.
smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git
commit e42d3f65f1d326b06d106f529f4e4144aab051cb
Author: dimuthu.upeksha2@gmail.com <Di...@1234>
AuthorDate: Sat Dec 2 23:00:56 2017 +0530
Initial agent implementation
---
.../modules/agents/agent-core/pom.xml | 40 +
.../airavata/agents/core/AsyncOperation.java | 24 +
.../airavata/agents/core/StatusPublisher.java | 49 +
.../modules/agents/thrift-agent/pom.xml | 47 +
.../agents/thrift/handler/OperationHandler.java | 34 +
.../thrift/operation/ThriftAgentOperation.java | 45 +
.../agents/thrift/server/OperationServer.java | 51 +
.../agents/thrift/stubs/OperationException.java | 476 +++++++++
.../agents/thrift/stubs/OperationService.java | 1072 ++++++++++++++++++++
.../src/main/resources/application.properties | 2 +
.../thrift-agent/src/main/resources/schema.thrift | 11 +
.../process/ProcessBootstrapDataResource.java | 40 +
.../k8s/api/resources/process/ProcessResource.java | 10 +
.../apache/airavata/helix/api/AbstractTask.java | 24 +-
.../airavata/helix/api/HelixParticipant.java | 7 +-
.../api/server/controller/WorkflowController.java | 10 +-
.../server/model/process/ProcessBootstrapData.java | 63 ++
.../k8s/api/server/model/process/ProcessModel.java | 12 +
.../process/ProcessBootstrapDataRepository.java | 13 +
.../k8s/api/server/service/ProcessService.java | 15 +-
.../k8s/api/server/service/WorkflowService.java | 15 +-
.../api/server/service/util/ToResourceUtil.java | 17 +
.../microservices/async-event-listener/pom.xml | 129 +++
.../airavata/async/event/listener/Application.java | 29 +
.../event/listener/messaging/KafkaReceiver.java | 52 +
.../event/listener/messaging/ReceiverConfig.java | 82 ++
.../event/listener/service/ListenerService.java | 35 +
.../src/main/resources/application.properties | 4 +
.../src/main/resources/application.yml | 4 +
.../microservices/tasks/async-command-task/pom.xml | 118 +++
.../task/async/command/AsyncCommandTask.java} | 65 +-
.../helix/task/async}/command/Participant.java | 9 +-
.../src/main/resources/application.properties | 8 +
.../src/main/resources/log4j.properties | 9 +
.../airavata/helix/task/command/CommandTask.java | 5 +-
.../airavata/helix/task/command/Participant.java | 2 +-
.../src/main/resources/application.properties | 2 +
.../airavata/helix/task/datain/DataInputTask.java | 5 +-
.../airavata/helix/task/datain/Participant.java | 2 +-
.../src/main/resources/application.properties | 4 +-
.../helix/task/dataout/DataOutputTask.java | 5 +-
.../airavata/helix/task/dataout/Participant.java | 2 +-
.../src/main/resources/application.properties | 4 +-
.../k8s/gfac/core/HelixWorkflowManager.java | 24 +-
.../airavata/k8s/gfac/service/WorkerService.java | 3 +-
airavata-kubernetes/pom.xml | 4 +
46 files changed, 2607 insertions(+), 76 deletions(-)
diff --git a/airavata-kubernetes/modules/agents/agent-core/pom.xml b/airavata-kubernetes/modules/agents/agent-core/pom.xml
new file mode 100644
index 0000000..e8ce15d
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>agent-core</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>api-resource</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java
new file mode 100644
index 0000000..cc488a0
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/AsyncOperation.java
@@ -0,0 +1,24 @@
+package org.apache.airavata.agents.core;
+
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public abstract class AsyncOperation {
+
+ private ComputeResource computeResource;
+
+ public AsyncOperation(ComputeResource computeResource) {
+ this.computeResource = computeResource;
+ }
+
+ public abstract void executeCommandAsync(String command, long callbackWorkflowId);
+
+ public ComputeResource getComputeResource() {
+ return computeResource;
+ }
+}
diff --git a/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java
new file mode 100644
index 0000000..71af84e
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/agent-core/src/main/java/org/apache/airavata/agents/core/StatusPublisher.java
@@ -0,0 +1,49 @@
+package org.apache.airavata.agents.core;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class StatusPublisher {
+ private String brokerUrl;
+ private String topicName;
+
+ private Producer<String, String> eventProducer;
+
+ public StatusPublisher(String brokerUrl, String topicName) {
+ this.brokerUrl = brokerUrl;
+ this.topicName = topicName;
+ this.initializeKafkaEventProducer();
+ }
+
+ public void publishStatus(long callbackWorkflowId, String status, String message) {
+ this.eventProducer.send(new ProducerRecord<String, String>(
+ this.topicName, String.join(",", callbackWorkflowId + "", status, message)));
+ }
+
+ public void initializeKafkaEventProducer() {
+ Properties props = new Properties();
+
+ props.put("bootstrap.servers", this.brokerUrl);
+
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ eventProducer = new KafkaProducer<String, String>(props);
+ }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/pom.xml b/airavata-kubernetes/modules/agents/thrift-agent/pom.xml
new file mode 100644
index 0000000..1bfb0b8
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>thrift-agent</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-core</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java
new file mode 100644
index 0000000..c402a83
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/handler/OperationHandler.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.agents.thrift.handler;
+
+import org.apache.airavata.agents.core.StatusPublisher;
+import org.apache.airavata.agents.thrift.stubs.OperationException;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.thrift.TException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OperationHandler extends StatusPublisher implements OperationService.Iface {
+
+ public OperationHandler(String brokerUrl, String topicName) {
+ super(brokerUrl, topicName);
+ }
+
+ @Override
+ public void executeCommand(String command, long callbackWorkflowId) throws OperationException, TException {
+ publishStatus(callbackWorkflowId, "PENDING", "Pending for execution");
+ publishStatus(callbackWorkflowId, "STARTED", "Starting command execution");
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ System.out.println("Executing command " + command);
+ publishStatus(callbackWorkflowId, "SUCCESS", "Command execution succeeded");
+ }
+ };
+
+ new Thread(task).start();
+ }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java
new file mode 100644
index 0000000..cb9010b
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/operation/ThriftAgentOperation.java
@@ -0,0 +1,45 @@
+package org.apache.airavata.agents.thrift.operation;
+
+import org.apache.airavata.agents.core.AsyncOperation;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class ThriftAgentOperation extends AsyncOperation {
+
+ private OperationService.Client client;
+
+ public ThriftAgentOperation(ComputeResource computeResource) {
+ super(computeResource);
+
+ try {
+ TTransport transport = new TSocket(computeResource.getHost(), 9090);
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ this.client = new OperationService.Client(protocol);
+
+ } catch (TTransportException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void executeCommandAsync(String command, long callbackWorkflowId) {
+ try {
+ client.executeCommand(command, callbackWorkflowId);
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java
new file mode 100644
index 0000000..4eecd8f
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/server/OperationServer.java
@@ -0,0 +1,51 @@
+package org.apache.airavata.agents.thrift.server;
+
+import org.apache.airavata.agents.thrift.handler.OperationHandler;
+import org.apache.airavata.agents.thrift.stubs.OperationService;
+import org.apache.airavata.helix.api.PropertyResolver;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TSimpleServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.io.IOException;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class OperationServer {
+
+ private static OperationHandler operationHandler;
+ private static OperationService.Processor processor;
+
+ public static void main(String args[]) throws IOException {
+ PropertyResolver resolver = new PropertyResolver();
+ resolver.loadInputStream(OperationService.class.getClassLoader().getResourceAsStream("application.properties"));
+ operationHandler = new OperationHandler(resolver.get("kafka.bootstrap.url"), resolver.get("async.event.listener.topic"));
+ processor = new OperationService.Processor(operationHandler);
+
+ Runnable server = new Runnable() {
+ @Override
+ public void run() {
+ simple(processor);
+ }
+ };
+
+ new Thread(server).start();
+ }
+
+ public static void simple(OperationService.Processor processor) {
+ try {
+ TServerTransport serverTransport = new TServerSocket(9090);
+ TServer server = new TSimpleServer(new TServer.Args(serverTransport).processor(processor));
+
+ System.out.println("Starting the operation server...");
+ server.serve();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java
new file mode 100644
index 0000000..0ab63f5
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationException.java
@@ -0,0 +1,476 @@
+/**
+ * Autogenerated by Thrift Compiler (0.10.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.agents.thrift.stubs;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)", date = "2017-12-02")
+public class OperationException extends org.apache.thrift.TException implements org.apache.thrift.TBase<OperationException, OperationException._Fields>, java.io.Serializable, Cloneable, Comparable<OperationException> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OperationException");
+
+ private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField STACKTRACE_FIELD_DESC = new org.apache.thrift.protocol.TField("stacktrace", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new OperationExceptionStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new OperationExceptionTupleSchemeFactory();
+
+ public java.lang.String message; // required
+ public java.lang.String stacktrace; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ MESSAGE((short)1, "message"),
+ STACKTRACE((short)2, "stacktrace");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MESSAGE
+ return MESSAGE;
+ case 2: // STACKTRACE
+ return STACKTRACE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.STACKTRACE, new org.apache.thrift.meta_data.FieldMetaData("stacktrace", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(OperationException.class, metaDataMap);
+ }
+
+ public OperationException() {
+ }
+
+ public OperationException(
+ java.lang.String message,
+ java.lang.String stacktrace)
+ {
+ this();
+ this.message = message;
+ this.stacktrace = stacktrace;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public OperationException(OperationException other) {
+ if (other.isSetMessage()) {
+ this.message = other.message;
+ }
+ if (other.isSetStacktrace()) {
+ this.stacktrace = other.stacktrace;
+ }
+ }
+
+ public OperationException deepCopy() {
+ return new OperationException(this);
+ }
+
+ @Override
+ public void clear() {
+ this.message = null;
+ this.stacktrace = null;
+ }
+
+ public java.lang.String getMessage() {
+ return this.message;
+ }
+
+ public OperationException setMessage(java.lang.String message) {
+ this.message = message;
+ return this;
+ }
+
+ public void unsetMessage() {
+ this.message = null;
+ }
+
+ /** Returns true if field message is set (has been assigned a value) and false otherwise */
+ public boolean isSetMessage() {
+ return this.message != null;
+ }
+
+ public void setMessageIsSet(boolean value) {
+ if (!value) {
+ this.message = null;
+ }
+ }
+
+ public java.lang.String getStacktrace() {
+ return this.stacktrace;
+ }
+
+ public OperationException setStacktrace(java.lang.String stacktrace) {
+ this.stacktrace = stacktrace;
+ return this;
+ }
+
+ public void unsetStacktrace() {
+ this.stacktrace = null;
+ }
+
+ /** Returns true if field stacktrace is set (has been assigned a value) and false otherwise */
+ public boolean isSetStacktrace() {
+ return this.stacktrace != null;
+ }
+
+ public void setStacktraceIsSet(boolean value) {
+ if (!value) {
+ this.stacktrace = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, java.lang.Object value) {
+ switch (field) {
+ case MESSAGE:
+ if (value == null) {
+ unsetMessage();
+ } else {
+ setMessage((java.lang.String)value);
+ }
+ break;
+
+ case STACKTRACE:
+ if (value == null) {
+ unsetStacktrace();
+ } else {
+ setStacktrace((java.lang.String)value);
+ }
+ break;
+
+ }
+ }
+
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MESSAGE:
+ return getMessage();
+
+ case STACKTRACE:
+ return getStacktrace();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MESSAGE:
+ return isSetMessage();
+ case STACKTRACE:
+ return isSetStacktrace();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof OperationException)
+ return this.equals((OperationException)that);
+ return false;
+ }
+
+ public boolean equals(OperationException that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_message = true && this.isSetMessage();
+ boolean that_present_message = true && that.isSetMessage();
+ if (this_present_message || that_present_message) {
+ if (!(this_present_message && that_present_message))
+ return false;
+ if (!this.message.equals(that.message))
+ return false;
+ }
+
+ boolean this_present_stacktrace = true && this.isSetStacktrace();
+ boolean that_present_stacktrace = true && that.isSetStacktrace();
+ if (this_present_stacktrace || that_present_stacktrace) {
+ if (!(this_present_stacktrace && that_present_stacktrace))
+ return false;
+ if (!this.stacktrace.equals(that.stacktrace))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetMessage()) ? 131071 : 524287);
+ if (isSetMessage())
+ hashCode = hashCode * 8191 + message.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetStacktrace()) ? 131071 : 524287);
+ if (isSetStacktrace())
+ hashCode = hashCode * 8191 + stacktrace.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(OperationException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetMessage()).compareTo(other.isSetMessage());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMessage()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.message, other.message);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetStacktrace()).compareTo(other.isSetStacktrace());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStacktrace()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.stacktrace, other.stacktrace);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("OperationException(");
+ boolean first = true;
+
+ sb.append("message:");
+ if (this.message == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.message);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("stacktrace:");
+ if (this.stacktrace == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.stacktrace);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class OperationExceptionStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public OperationExceptionStandardScheme getScheme() {
+ return new OperationExceptionStandardScheme();
+ }
+ }
+
+ private static class OperationExceptionStandardScheme extends org.apache.thrift.scheme.StandardScheme<OperationException> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, OperationException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // MESSAGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.message = iprot.readString();
+ struct.setMessageIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // STACKTRACE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.stacktrace = iprot.readString();
+ struct.setStacktraceIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, OperationException struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.message != null) {
+ oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
+ oprot.writeString(struct.message);
+ oprot.writeFieldEnd();
+ }
+ if (struct.stacktrace != null) {
+ oprot.writeFieldBegin(STACKTRACE_FIELD_DESC);
+ oprot.writeString(struct.stacktrace);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class OperationExceptionTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public OperationExceptionTupleScheme getScheme() {
+ return new OperationExceptionTupleScheme();
+ }
+ }
+
+ private static class OperationExceptionTupleScheme extends org.apache.thrift.scheme.TupleScheme<OperationException> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, OperationException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetMessage()) {
+ optionals.set(0);
+ }
+ if (struct.isSetStacktrace()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetMessage()) {
+ oprot.writeString(struct.message);
+ }
+ if (struct.isSetStacktrace()) {
+ oprot.writeString(struct.stacktrace);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, OperationException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.message = iprot.readString();
+ struct.setMessageIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.stacktrace = iprot.readString();
+ struct.setStacktraceIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+}
+
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java
new file mode 100644
index 0000000..467daac
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/java/org/apache/airavata/agents/thrift/stubs/OperationService.java
@@ -0,0 +1,1072 @@
+/**
+ * Autogenerated by Thrift Compiler (0.10.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.agents.thrift.stubs;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)", date = "2017-12-02")
+public class OperationService {
+
+ public interface Iface {
+
+ public void executeCommand(java.lang.String command, long callbackWorkflowId) throws OperationException, org.apache.thrift.TException;
+
+ }
+
+ public interface AsyncIface {
+
+ public void executeCommand(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
+
+ }
+
+ public static class Client extends org.apache.thrift.TServiceClient implements Iface {
+ public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
+ public Factory() {}
+ public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+ return new Client(prot);
+ }
+ public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ return new Client(iprot, oprot);
+ }
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol prot)
+ {
+ super(prot, prot);
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ super(iprot, oprot);
+ }
+
+ public void executeCommand(java.lang.String command, long callbackWorkflowId) throws OperationException, org.apache.thrift.TException
+ {
+ send_executeCommand(command, callbackWorkflowId);
+ recv_executeCommand();
+ }
+
+ public void send_executeCommand(java.lang.String command, long callbackWorkflowId) throws org.apache.thrift.TException
+ {
+ executeCommand_args args = new executeCommand_args();
+ args.setCommand(command);
+ args.setCallbackWorkflowId(callbackWorkflowId);
+ sendBase("executeCommand", args);
+ }
+
+ public void recv_executeCommand() throws OperationException, org.apache.thrift.TException
+ {
+ executeCommand_result result = new executeCommand_result();
+ receiveBase(result, "executeCommand");
+ if (result.ex != null) {
+ throw result.ex;
+ }
+ return;
+ }
+
+ }
+ public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
+ public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+ private org.apache.thrift.async.TAsyncClientManager clientManager;
+ private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+ public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+ this.clientManager = clientManager;
+ this.protocolFactory = protocolFactory;
+ }
+ public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+ return new AsyncClient(protocolFactory, clientManager, transport);
+ }
+ }
+
+ public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
+ super(protocolFactory, clientManager, transport);
+ }
+
+ public void executeCommand(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ executeCommand_call method_call = new executeCommand_call(command, callbackWorkflowId, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class executeCommand_call extends org.apache.thrift.async.TAsyncMethodCall<Void> {
+ private java.lang.String command;
+ private long callbackWorkflowId;
+ public executeCommand_call(java.lang.String command, long callbackWorkflowId, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.command = command;
+ this.callbackWorkflowId = callbackWorkflowId;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("executeCommand", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ executeCommand_args args = new executeCommand_args();
+ args.setCommand(command);
+ args.setCallbackWorkflowId(callbackWorkflowId);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public Void getResult() throws OperationException, org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new java.lang.IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return null;
+ }
+ }
+
+ }
+
+ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+ private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
+ public Processor(I iface) {
+ super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+ }
+
+ protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends Iface> java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ processMap.put("executeCommand", new executeCommand());
+ return processMap;
+ }
+
+ public static class executeCommand<I extends Iface> extends org.apache.thrift.ProcessFunction<I, executeCommand_args> {
+ public executeCommand() {
+ super("executeCommand");
+ }
+
+ public executeCommand_args getEmptyArgsInstance() {
+ return new executeCommand_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public executeCommand_result getResult(I iface, executeCommand_args args) throws org.apache.thrift.TException {
+ executeCommand_result result = new executeCommand_result();
+ try {
+ iface.executeCommand(args.command, args.callbackWorkflowId);
+ } catch (OperationException ex) {
+ result.ex = ex;
+ }
+ return result;
+ }
+ }
+
+ }
+
+ public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
+ private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
+ public AsyncProcessor(I iface) {
+ super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
+ }
+
+ protected AsyncProcessor(I iface, java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends AsyncIface> java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ processMap.put("executeCommand", new executeCommand());
+ return processMap;
+ }
+
+ public static class executeCommand<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, executeCommand_args, Void> {
+ public executeCommand() {
+ super("executeCommand");
+ }
+
+ public executeCommand_args getEmptyArgsInstance() {
+ return new executeCommand_args();
+ }
+
+ public org.apache.thrift.async.AsyncMethodCallback<Void> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new org.apache.thrift.async.AsyncMethodCallback<Void>() {
+ public void onComplete(Void o) {
+ executeCommand_result result = new executeCommand_result();
+ try {
+ fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ } catch (org.apache.thrift.transport.TTransportException e) {
+ _LOGGER.error("TTransportException writing to internal frame buffer", e);
+ fb.close();
+ } catch (java.lang.Exception e) {
+ _LOGGER.error("Exception writing to internal frame buffer", e);
+ onError(e);
+ }
+ }
+ public void onError(java.lang.Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TSerializable msg;
+ executeCommand_result result = new executeCommand_result();
+ if (e instanceof OperationException) {
+ result.ex = (OperationException) e;
+ result.setExIsSet(true);
+ msg = result;
+ } else if (e instanceof org.apache.thrift.transport.TTransportException) {
+ _LOGGER.error("TTransportException inside handler", e);
+ fb.close();
+ return;
+ } else if (e instanceof org.apache.thrift.TApplicationException) {
+ _LOGGER.error("TApplicationException inside handler", e);
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TApplicationException)e;
+ } else {
+ _LOGGER.error("Exception inside handler", e);
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ } catch (java.lang.Exception ex) {
+ _LOGGER.error("Exception writing to internal frame buffer", ex);
+ fb.close();
+ }
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, executeCommand_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+ iface.executeCommand(args.command, args.callbackWorkflowId,resultHandler);
+ }
+ }
+
+ }
+
+ public static class executeCommand_args implements org.apache.thrift.TBase<executeCommand_args, executeCommand_args._Fields>, java.io.Serializable, Cloneable, Comparable<executeCommand_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("executeCommand_args");
+
+ private static final org.apache.thrift.protocol.TField COMMAND_FIELD_DESC = new org.apache.thrift.protocol.TField("command", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField CALLBACK_WORKFLOW_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackWorkflowId", org.apache.thrift.protocol.TType.I64, (short)2);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new executeCommand_argsStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new executeCommand_argsTupleSchemeFactory();
+
+ public java.lang.String command; // required
+ public long callbackWorkflowId; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ COMMAND((short)1, "command"),
+ CALLBACK_WORKFLOW_ID((short)2, "callbackWorkflowId");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // COMMAND
+ return COMMAND;
+ case 2: // CALLBACK_WORKFLOW_ID
+ return CALLBACK_WORKFLOW_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __CALLBACKWORKFLOWID_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.COMMAND, new org.apache.thrift.meta_data.FieldMetaData("command", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.CALLBACK_WORKFLOW_ID, new org.apache.thrift.meta_data.FieldMetaData("callbackWorkflowId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(executeCommand_args.class, metaDataMap);
+ }
+
+ public executeCommand_args() {
+ }
+
+ public executeCommand_args(
+ java.lang.String command,
+ long callbackWorkflowId)
+ {
+ this();
+ this.command = command;
+ this.callbackWorkflowId = callbackWorkflowId;
+ setCallbackWorkflowIdIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public executeCommand_args(executeCommand_args other) {
+ __isset_bitfield = other.__isset_bitfield;
+ if (other.isSetCommand()) {
+ this.command = other.command;
+ }
+ this.callbackWorkflowId = other.callbackWorkflowId;
+ }
+
+ public executeCommand_args deepCopy() {
+ return new executeCommand_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.command = null;
+ setCallbackWorkflowIdIsSet(false);
+ this.callbackWorkflowId = 0;
+ }
+
+ public java.lang.String getCommand() {
+ return this.command;
+ }
+
+ public executeCommand_args setCommand(java.lang.String command) {
+ this.command = command;
+ return this;
+ }
+
+ public void unsetCommand() {
+ this.command = null;
+ }
+
+ /** Returns true if field command is set (has been assigned a value) and false otherwise */
+ public boolean isSetCommand() {
+ return this.command != null;
+ }
+
+ public void setCommandIsSet(boolean value) {
+ if (!value) {
+ this.command = null;
+ }
+ }
+
+ public long getCallbackWorkflowId() {
+ return this.callbackWorkflowId;
+ }
+
+ public executeCommand_args setCallbackWorkflowId(long callbackWorkflowId) {
+ this.callbackWorkflowId = callbackWorkflowId;
+ setCallbackWorkflowIdIsSet(true);
+ return this;
+ }
+
+ public void unsetCallbackWorkflowId() {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID);
+ }
+
+ /** Returns true if field callbackWorkflowId is set (has been assigned a value) and false otherwise */
+ public boolean isSetCallbackWorkflowId() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID);
+ }
+
+ public void setCallbackWorkflowIdIsSet(boolean value) {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __CALLBACKWORKFLOWID_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, java.lang.Object value) {
+ switch (field) {
+ case COMMAND:
+ if (value == null) {
+ unsetCommand();
+ } else {
+ setCommand((java.lang.String)value);
+ }
+ break;
+
+ case CALLBACK_WORKFLOW_ID:
+ if (value == null) {
+ unsetCallbackWorkflowId();
+ } else {
+ setCallbackWorkflowId((java.lang.Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case COMMAND:
+ return getCommand();
+
+ case CALLBACK_WORKFLOW_ID:
+ return getCallbackWorkflowId();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case COMMAND:
+ return isSetCommand();
+ case CALLBACK_WORKFLOW_ID:
+ return isSetCallbackWorkflowId();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof executeCommand_args)
+ return this.equals((executeCommand_args)that);
+ return false;
+ }
+
+ public boolean equals(executeCommand_args that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_command = true && this.isSetCommand();
+ boolean that_present_command = true && that.isSetCommand();
+ if (this_present_command || that_present_command) {
+ if (!(this_present_command && that_present_command))
+ return false;
+ if (!this.command.equals(that.command))
+ return false;
+ }
+
+ boolean this_present_callbackWorkflowId = true;
+ boolean that_present_callbackWorkflowId = true;
+ if (this_present_callbackWorkflowId || that_present_callbackWorkflowId) {
+ if (!(this_present_callbackWorkflowId && that_present_callbackWorkflowId))
+ return false;
+ if (this.callbackWorkflowId != that.callbackWorkflowId)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetCommand()) ? 131071 : 524287);
+ if (isSetCommand())
+ hashCode = hashCode * 8191 + command.hashCode();
+
+ hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(callbackWorkflowId);
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(executeCommand_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetCommand()).compareTo(other.isSetCommand());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCommand()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.command, other.command);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetCallbackWorkflowId()).compareTo(other.isSetCallbackWorkflowId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCallbackWorkflowId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackWorkflowId, other.callbackWorkflowId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("executeCommand_args(");
+ boolean first = true;
+
+ sb.append("command:");
+ if (this.command == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.command);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("callbackWorkflowId:");
+ sb.append(this.callbackWorkflowId);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class executeCommand_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public executeCommand_argsStandardScheme getScheme() {
+ return new executeCommand_argsStandardScheme();
+ }
+ }
+
+ private static class executeCommand_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<executeCommand_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, executeCommand_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // COMMAND
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.command = iprot.readString();
+ struct.setCommandIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // CALLBACK_WORKFLOW_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.callbackWorkflowId = iprot.readI64();
+ struct.setCallbackWorkflowIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, executeCommand_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.command != null) {
+ oprot.writeFieldBegin(COMMAND_FIELD_DESC);
+ oprot.writeString(struct.command);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(CALLBACK_WORKFLOW_ID_FIELD_DESC);
+ oprot.writeI64(struct.callbackWorkflowId);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class executeCommand_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public executeCommand_argsTupleScheme getScheme() {
+ return new executeCommand_argsTupleScheme();
+ }
+ }
+
+ private static class executeCommand_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<executeCommand_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, executeCommand_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetCommand()) {
+ optionals.set(0);
+ }
+ if (struct.isSetCallbackWorkflowId()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetCommand()) {
+ oprot.writeString(struct.command);
+ }
+ if (struct.isSetCallbackWorkflowId()) {
+ oprot.writeI64(struct.callbackWorkflowId);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, executeCommand_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.command = iprot.readString();
+ struct.setCommandIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.callbackWorkflowId = iprot.readI64();
+ struct.setCallbackWorkflowIdIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
+ public static class executeCommand_result implements org.apache.thrift.TBase<executeCommand_result, executeCommand_result._Fields>, java.io.Serializable, Cloneable, Comparable<executeCommand_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("executeCommand_result");
+
+ private static final org.apache.thrift.protocol.TField EX_FIELD_DESC = new org.apache.thrift.protocol.TField("ex", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new executeCommand_resultStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new executeCommand_resultTupleSchemeFactory();
+
+ public OperationException ex; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EX((short)1, "ex");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EX
+ return EX;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EX, new org.apache.thrift.meta_data.FieldMetaData("ex", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OperationException.class)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(executeCommand_result.class, metaDataMap);
+ }
+
+ public executeCommand_result() {
+ }
+
+ public executeCommand_result(
+ OperationException ex)
+ {
+ this();
+ this.ex = ex;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public executeCommand_result(executeCommand_result other) {
+ if (other.isSetEx()) {
+ this.ex = new OperationException(other.ex);
+ }
+ }
+
+ public executeCommand_result deepCopy() {
+ return new executeCommand_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.ex = null;
+ }
+
+ public OperationException getEx() {
+ return this.ex;
+ }
+
+ public executeCommand_result setEx(OperationException ex) {
+ this.ex = ex;
+ return this;
+ }
+
+ public void unsetEx() {
+ this.ex = null;
+ }
+
+ /** Returns true if field ex is set (has been assigned a value) and false otherwise */
+ public boolean isSetEx() {
+ return this.ex != null;
+ }
+
+ public void setExIsSet(boolean value) {
+ if (!value) {
+ this.ex = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, java.lang.Object value) {
+ switch (field) {
+ case EX:
+ if (value == null) {
+ unsetEx();
+ } else {
+ setEx((OperationException)value);
+ }
+ break;
+
+ }
+ }
+
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EX:
+ return getEx();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EX:
+ return isSetEx();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof executeCommand_result)
+ return this.equals((executeCommand_result)that);
+ return false;
+ }
+
+ public boolean equals(executeCommand_result that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_ex = true && this.isSetEx();
+ boolean that_present_ex = true && that.isSetEx();
+ if (this_present_ex || that_present_ex) {
+ if (!(this_present_ex && that_present_ex))
+ return false;
+ if (!this.ex.equals(that.ex))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetEx()) ? 131071 : 524287);
+ if (isSetEx())
+ hashCode = hashCode * 8191 + ex.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(executeCommand_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetEx()).compareTo(other.isSetEx());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetEx()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.ex, other.ex);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("executeCommand_result(");
+ boolean first = true;
+
+ sb.append("ex:");
+ if (this.ex == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.ex);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class executeCommand_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public executeCommand_resultStandardScheme getScheme() {
+ return new executeCommand_resultStandardScheme();
+ }
+ }
+
+ private static class executeCommand_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<executeCommand_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, executeCommand_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EX
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.ex = new OperationException();
+ struct.ex.read(iprot);
+ struct.setExIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, executeCommand_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.ex != null) {
+ oprot.writeFieldBegin(EX_FIELD_DESC);
+ struct.ex.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class executeCommand_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public executeCommand_resultTupleScheme getScheme() {
+ return new executeCommand_resultTupleScheme();
+ }
+ }
+
+ private static class executeCommand_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<executeCommand_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, executeCommand_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetEx()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetEx()) {
+ struct.ex.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, executeCommand_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.ex = new OperationException();
+ struct.ex.read(iprot);
+ struct.setExIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
+}
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties
new file mode 100644
index 0000000..bd5b373
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/application.properties
@@ -0,0 +1,2 @@
+kafka.bootstrap.url=localhost:9092
+async.event.listener.topic=async-event-listener
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift
new file mode 100644
index 0000000..9ff22d0
--- /dev/null
+++ b/airavata-kubernetes/modules/agents/thrift-agent/src/main/resources/schema.thrift
@@ -0,0 +1,11 @@
+namespace java org.apache.airavata.agents.thrift.stubs
+
+exception OperationException {
+ 1: string message,
+ 2: string stacktrace
+}
+
+service OperationService
+{
+ void executeCommand(1:string command, 2:i64 callbackWorkflowId) throws (1:OperationException ex)
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java
new file mode 100644
index 0000000..25f0962
--- /dev/null
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessBootstrapDataResource.java
@@ -0,0 +1,40 @@
+package org.apache.airavata.k8s.api.resources.process;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class ProcessBootstrapDataResource {
+ private long id;
+ private String key;
+ private String value;
+
+ public long getId() {
+ return id;
+ }
+
+ public ProcessBootstrapDataResource setId(long id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public ProcessBootstrapDataResource setKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public ProcessBootstrapDataResource setValue(String value) {
+ this.value = value;
+ return this;
+ }
+}
diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
index b5081cf..fe81007 100644
--- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
+++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessResource.java
@@ -41,6 +41,7 @@ public class ProcessResource {
private List<ProcessStatusResource> processStatuses = new ArrayList<>();
private List<TaskResource> tasks = new ArrayList<>();
private List<Long> processErrorIds = new ArrayList<>();
+ private List<ProcessBootstrapDataResource> processBootstrapData = new ArrayList<>();
private String taskDag;
private String experimentDataDir;
private String processType;
@@ -152,4 +153,13 @@ public class ProcessResource {
this.workflowId = workflowId;
return this;
}
+
+ public List<ProcessBootstrapDataResource> getProcessBootstrapData() {
+ return processBootstrapData;
+ }
+
+ public ProcessResource setProcessBootstrapData(List<ProcessBootstrapDataResource> processBootstrapData) {
+ this.processBootstrapData = processBootstrapData;
+ return this;
+ }
}
diff --git a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
index 03589e7..a3ccffb 100644
--- a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
@@ -29,9 +29,9 @@ public abstract class AbstractTask extends UserContentStore implements Task {
public static final String PROCESS_ID = "process_id";
//Configurable values
- private String apiServerUrl = "localhost:8080";
- private String kafkaBootstrapUrl = "localhost:9092";
- private String eventTopic = "airavata-task-event";
+ private String apiServerUrl;
+ private String kafkaBootstrapUrl;
+ private String eventTopic;
private TaskCallbackContext callbackContext;
private RestTemplate restTemplate;
@@ -39,11 +39,19 @@ public abstract class AbstractTask extends UserContentStore implements Task {
private long processId;
private long taskId;
- public AbstractTask(TaskCallbackContext callbackContext) {
+ private PropertyResolver propertyResolver;
+
+ public AbstractTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
this.callbackContext = callbackContext;
this.taskId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(TASK_ID));
this.processId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(PROCESS_ID));
this.restTemplate = new RestTemplate();
+ this.propertyResolver = propertyResolver;
+
+ this.apiServerUrl = getPropertyResolver().get("api.server.url");
+ this.kafkaBootstrapUrl = getPropertyResolver().get("kafka.bootstrap.url");
+ this.eventTopic = getPropertyResolver().get("event.topic");
+
initializeKafkaEventProducer();
init();
}
@@ -135,4 +143,12 @@ public abstract class AbstractTask extends UserContentStore implements Task {
public long getTaskId() {
return taskId;
}
+
+ public Producer<String, String> getEventProducer() {
+ return eventProducer;
+ }
+
+ public PropertyResolver getPropertyResolver() {
+ return propertyResolver;
+ }
}
diff --git a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
index a2a56ca..ceb8126 100644
--- a/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
@@ -38,12 +38,13 @@ public abstract class HelixParticipant implements Runnable {
private String taskTypeName;
private String apiServerUrl;
private RestTemplate restTemplate;
+ private PropertyResolver propertyResolver;
public HelixParticipant(String propertyFile) throws IOException {
logger.debug("Initializing Participant Node");
- PropertyResolver propertyResolver = new PropertyResolver();
+ this.propertyResolver = new PropertyResolver();
propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile));
this.zkAddress = propertyResolver.get("zookeeper.connection.url");
@@ -137,4 +138,8 @@ public abstract class HelixParticipant implements Runnable {
zkHelixManager.disconnect();
}
}
+
+ public PropertyResolver getPropertyResolver() {
+ return propertyResolver;
+ }
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
index 273a59f..67b4b2a 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/WorkflowController.java
@@ -9,6 +9,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
+import java.util.Map;
/**
* TODO: Class level comments please
@@ -40,7 +41,12 @@ public class WorkflowController {
}
@GetMapping(path = "{id}/launch", produces = MediaType.APPLICATION_JSON_VALUE)
- public long launchExperiment(@PathVariable("id") long id) {
- return this.workflowService.launchWorkflow(id);
+ public long launchWorkflow(@PathVariable("id") long id) {
+ return this.workflowService.launchWorkflow(id, null);
+ }
+
+ @PostMapping(path = "{id}/launch", produces = MediaType.APPLICATION_JSON_VALUE)
+ public long launchWorkflow(@PathVariable("id") long id, @RequestBody Map<String, String> boostrapData) {
+ return this.workflowService.launchWorkflow(id, boostrapData);
}
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java
new file mode 100644
index 0000000..5ed9222
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessBootstrapData.java
@@ -0,0 +1,63 @@
+package org.apache.airavata.k8s.api.server.model.process;
+
+import javax.persistence.*;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Entity
+@Table(name = "PROCESS_BOOTSTRAP_DATA")
+public class ProcessBootstrapData {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ private long id;
+
+ @ManyToOne
+ private ProcessModel processModel;
+
+ @Column(name = "DATA_KEY")
+ private String key;
+
+ @Column(name = "DATA_VALUE")
+ private String value;
+
+ public long getId() {
+ return id;
+ }
+
+ public ProcessBootstrapData setId(long id) {
+ this.id = id;
+ return this;
+ }
+
+ public ProcessModel getProcessModel() {
+ return processModel;
+ }
+
+ public ProcessBootstrapData setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ return this;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public ProcessBootstrapData setKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public ProcessBootstrapData setValue(String value) {
+ this.value = value;
+ return this;
+ }
+}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
index 5a4054f..b605b9f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/process/ProcessModel.java
@@ -60,6 +60,9 @@ public class ProcessModel {
@OneToMany(mappedBy = "parentProcess", cascade = CascadeType.ALL)
private List<TaskModel> tasks = new ArrayList<>();
+ @OneToMany(mappedBy = "processModel", cascade = CascadeType.ALL)
+ private List<ProcessBootstrapData> processBootstrapData = new ArrayList<>();
+
private String taskDag;
@OneToMany
@@ -177,6 +180,15 @@ public class ProcessModel {
return this;
}
+ public List<ProcessBootstrapData> getProcessBootstrapData() {
+ return processBootstrapData;
+ }
+
+ public ProcessModel setProcessBootstrapData(List<ProcessBootstrapData> processBootstrapData) {
+ this.processBootstrapData = processBootstrapData;
+ return this;
+ }
+
public enum ProcessType {
WORKFLOW, EXPERIMENT;
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java
new file mode 100644
index 0000000..e79d74e
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/process/ProcessBootstrapDataRepository.java
@@ -0,0 +1,13 @@
+package org.apache.airavata.k8s.api.server.repository.process;
+
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public interface ProcessBootstrapDataRepository extends CrudRepository<ProcessBootstrapData, Long> {
+}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
index 500e83f..bee190f 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/ProcessService.java
@@ -21,9 +21,11 @@ package org.apache.airavata.k8s.api.server.service;
import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
import org.apache.airavata.k8s.api.server.model.task.TaskModel;
+import org.apache.airavata.k8s.api.server.repository.process.ProcessBootstrapDataRepository;
import org.apache.airavata.k8s.api.server.repository.process.ProcessRepository;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.server.repository.process.ProcessStatusRepository;
@@ -45,6 +47,7 @@ public class ProcessService {
private ProcessRepository processRepository;
private ProcessStatusRepository processStatusRepository;
+ private ProcessBootstrapDataRepository bootstrapDataRepository;
private ExperimentService experimentService;
private TaskService taskService;
@@ -55,13 +58,15 @@ public class ProcessService {
ProcessStatusRepository processStatusRepository,
ExperimentService experimentService,
TaskService taskService,
- WorkflowRepository workflowRepository) {
+ WorkflowRepository workflowRepository,
+ ProcessBootstrapDataRepository bootstrapDataRepository) {
this.processRepository = processRepository;
this.processStatusRepository = processStatusRepository;
this.experimentService = experimentService;
this.taskService = taskService;
this.workflowRepository = workflowRepository;
+ this.bootstrapDataRepository = bootstrapDataRepository;
}
public long create(ProcessResource resource) {
@@ -95,6 +100,14 @@ public class ProcessService {
taskModel.setId(taskService.create(taskRes));
}));
+ Optional.ofNullable(resource.getProcessBootstrapData()).ifPresent(bootstrapDatas -> bootstrapDatas.forEach(data -> {
+ ProcessBootstrapData bootstrapData = new ProcessBootstrapData();
+ bootstrapData.setProcessModel(saved);
+ bootstrapData.setKey(data.getKey());
+ bootstrapData.setValue(data.getValue());
+ this.bootstrapDataRepository.save(bootstrapData);
+ }));
+
return saved.getId();
}
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
index 42be414..d76f5e5 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java
@@ -1,5 +1,6 @@
package org.apache.airavata.k8s.api.server.service;
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
@@ -18,10 +19,7 @@ import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
/**
* TODO: Class level comments please
@@ -70,15 +68,22 @@ public class WorkflowService {
return saved.getId();
}
- public long launchWorkflow(long id) {
+ public long launchWorkflow(long id, Map<String, String> boostrapData) {
Workflow workflow = this.workflowRepository.findById(id)
.orElseThrow(() -> new ServerRuntimeException("Workflow with id " + id + "can not be found"));
+ List<ProcessBootstrapDataResource> bootstrapDataResources = new ArrayList<>();
+
+ if (boostrapData != null) {
+ boostrapData.forEach((key, value) ->
+ bootstrapDataResources.add(new ProcessBootstrapDataResource().setKey(key).setValue(value)));
+ }
long processId = processService.create(new ProcessResource()
.setName("Workflow Process : " + workflow.getName() + "-" + UUID.randomUUID().toString())
.setCreationTime(System.currentTimeMillis())
.setProcessType("WORKFLOW")
+ .setProcessBootstrapData(bootstrapDataResources)
.setWorkflowId(id));
try {
diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
index 35c7fde..6e61c47 100644
--- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
+++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java
@@ -20,6 +20,7 @@
package org.apache.airavata.k8s.api.server.service.util;
import org.apache.airavata.k8s.api.resources.experiment.ExperimentStatusResource;
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
import org.apache.airavata.k8s.api.resources.task.*;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -35,6 +36,7 @@ import org.apache.airavata.k8s.api.server.model.experiment.Experiment;
import org.apache.airavata.k8s.api.server.model.experiment.ExperimentInputData;
import org.apache.airavata.k8s.api.server.model.experiment.ExperimentOutputData;
import org.apache.airavata.k8s.api.server.model.experiment.ExperimentStatus;
+import org.apache.airavata.k8s.api.server.model.process.ProcessBootstrapData;
import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
import org.apache.airavata.k8s.api.server.model.process.ProcessStatus;
import org.apache.airavata.k8s.api.server.model.task.*;
@@ -325,12 +327,27 @@ public class ToResourceUtil {
.ifPresent(tasks -> tasks.forEach(task -> processResource.getTasks().add(toResource(task).get())));
Optional.ofNullable(processModel.getProcessErrors())
.ifPresent(errs -> errs.forEach(err -> processResource.getProcessErrorIds().add(err.getId())));
+ Optional.ofNullable(processModel.getProcessBootstrapData())
+ .ifPresent(datas -> datas.forEach(data -> processResource.getProcessBootstrapData().add(toResource(data).get())));
+
return Optional.of(processResource);
} else {
return Optional.empty();
}
}
+ public static Optional<ProcessBootstrapDataResource> toResource(ProcessBootstrapData bootstrapData) {
+ if (bootstrapData != null) {
+ ProcessBootstrapDataResource resource = new ProcessBootstrapDataResource();
+ resource.setId(bootstrapData.getId());
+ resource.setKey(bootstrapData.getKey());
+ resource.setValue(bootstrapData.getValue());
+ return Optional.of(resource);
+ } else {
+ return Optional.empty();
+ }
+ }
+
public static Optional<ProcessStatusResource> toResource(ProcessStatus processStatus) {
if (processStatus != null) {
ProcessStatusResource resource = new ProcessStatusResource();
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml b/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml
new file mode 100644
index 0000000..7afdbf4
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>async-event-listener</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>api-resource</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-freemarker</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>jar</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <artifact-packaging>jar</artifact-packaging>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>1.4.3.RELEASE</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Create a docker image that runs the executable jar-->
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <imageName>${docker.image.prefix}/task-scheduler</imageName>
+ <baseImage>java:openjdk-8-jdk-alpine</baseImage>
+ <entryPoint>["java","-jar","/${project.build.finalName}.jar"]</entryPoint>
+ <resources>
+ <resource>
+ <targetPath>/</targetPath>
+ <directory>${project.build.directory}</directory>
+ <include>${project.build.finalName}.jar</include>
+ </resource>
+ </resources>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>war</id>
+ <properties>
+ <artifact-packaging>war</artifact-packaging>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-tomcat</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>1.4.3.RELEASE</version>
+ <configuration>
+ <!-- this will get rid of version info from war file name -->
+ <finalName>task-scheduler</finalName>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java
new file mode 100644
index 0000000..aa9b5d7
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/Application.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.async.event.listener;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@SpringBootApplication
+@Configuration
+@ComponentScan
+public class Application {
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @Bean
+ public RestTemplate restTemplate(RestTemplateBuilder builder) {
+ return builder.build();
+ }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java
new file mode 100644
index 0000000..c864ae2
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/KafkaReceiver.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.async.event.listener.messaging;
+
+import org.apache.airavata.async.event.listener.service.ListenerService;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import javax.annotation.Resource;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+public class KafkaReceiver {
+
+ @Resource
+ private ListenerService listenerService;
+
+ @KafkaListener(topics = "${listener.topic.name}", containerFactory = "kafkaListenerContainerFactory")
+ public void receiveProcesses(String payload) {
+ System.out.println("received process=" + payload);
+ long workflowId = Long.parseLong(payload.split(",")[0]);
+ String event = payload.split(",")[1];
+ String message = payload.split(",")[2];
+ listenerService.onEventReceived(workflowId, event, message);
+ }
+//
+// @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory")
+// public void receiveTaskEvent(TaskContext taskContext) {
+// System.out.println("received event for task id =" + taskContext.getTaskId());
+// workerService.onTaskStateEvent(taskContext);
+// }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java
new file mode 100644
index 0000000..2a59c86
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/messaging/ReceiverConfig.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.async.event.listener.messaging;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+ @Value("${kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${listener.group.name}")
+ private String listenerGroupName;
+
+ @Bean
+ public Map<String, Object> consumerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ // allows a pool of processes to divide the work of consuming and processing records
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, listenerGroupName);
+ return props;
+ }
+
+ @Bean
+ public ConsumerFactory<String, String> consumerFactory() {
+ return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
+ }
+
+ @Bean
+ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+
+ @Bean
+ public KafkaReceiver receiver() {
+ return new KafkaReceiver();
+ }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java
new file mode 100644
index 0000000..72f5309
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/java/org/apache/airavata/async/event/listener/service/ListenerService.java
@@ -0,0 +1,35 @@
+package org.apache.airavata.async.event.listener.service;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TODO: Class level comments please
+ *
+ * @author dimuthu
+ * @since 1.0.0-SNAPSHOT
+ */
+@Service
+public class ListenerService {
+
+ private final RestTemplate restTemplate;
+
+ @Value("${api.server.url}")
+ private String apiServerUrl;
+
+ public ListenerService(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
+ }
+
+ public void onEventReceived(long workflowId, String event, String message) {
+ Map<String, String> boostrapData = new HashMap<>();
+ boostrapData.put("event", event);
+ boostrapData.put("message", message);
+ this.restTemplate.postForObject(apiServerUrl + "/workflow/" + workflowId + "/launch", boostrapData, Long.class);
+ }
+}
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties
new file mode 100644
index 0000000..f4f3812
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.properties
@@ -0,0 +1,4 @@
+server.port = 9195
+listener.topic.name=async-event-listener
+listener.group.name=async-event-listener
+api.server.url=api-server.default.svc.cluster.local:8080
diff --git a/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml
new file mode 100644
index 0000000..069dd61
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/async-event-listener/src/main/resources/application.yml
@@ -0,0 +1,4 @@
+kafka:
+ bootstrap-servers: kafka.default.svc.cluster.local:9092
+ topic:
+ helloworld: helloworld.t
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml
new file mode 100644
index 0000000..a3a7669
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>async-command-task</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.airavata.helix.task.async.command.Participant</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Create a docker image that runs the executable jar-->
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <imageName>${docker.image.prefix}/async-command-task</imageName>
+ <baseImage>java:openjdk-8-jdk-alpine</baseImage>
+ <exposes>
+ <expose>8080</expose>
+ </exposes>
+ <entryPoint>["java","-jar","/${project.build.finalName}-jar-with-dependencies.jar"]</entryPoint>
+ <resources>
+ <resource>
+ <targetPath>/</targetPath>
+ <directory>${project.build.directory}</directory>
+ <include>${project.build.finalName}-jar-with-dependencies.jar</include>
+ </resource>
+ </resources>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-core</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>thrift-agent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>api-resource</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
similarity index 68%
copy from airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
copy to airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
index 866c2c7..5ece6f2 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/AsyncCommandTask.java
@@ -1,15 +1,18 @@
-package org.apache.airavata.helix.task.command;
+package org.apache.airavata.helix.task.async.command;
+import org.apache.airavata.agents.core.AsyncOperation;
import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.compute.api.ExecutionResult;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
/**
@@ -18,9 +21,9 @@ import java.util.Arrays;
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
-public class CommandTask extends AbstractTask {
+public class AsyncCommandTask extends AbstractTask {
- public static final String NAME = "COMMAND";
+ public static final String NAME = "ASYNC_COMMAND_TASK";
private String command;
private String arguments;
@@ -28,9 +31,10 @@ public class CommandTask extends AbstractTask {
private String stdErrPath;
private String computeResourceId;
private ComputeResource computeResource;
+ private Long callBackWorkflowId;
- public CommandTask(TaskCallbackContext callbackContext) {
- super(callbackContext);
+ public AsyncCommandTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+ super(callbackContext, propertyResolver);
}
@Override
@@ -40,48 +44,28 @@ public class CommandTask extends AbstractTask {
this.stdOutPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_OUT_PATH);
this.stdErrPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.STD_ERR_PATH);
this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
+ this.callBackWorkflowId = Long.parseLong(getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.CALLBACK_WORKFLOW));
this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
+
}
+ @Override
public TaskResult onRun() {
- System.out.println("Executing command " + command);
try {
-
- String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
-
- publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
-
- String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
-
- System.out.println("Executing command " + finalCommand);
- Thread.sleep(200000);
- ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
-
- if (executionResult.getExitStatus() == 0) {
- publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
- sendToOutPort("Out");
- return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
-
- } else if (executionResult.getExitStatus() == -1) {
- publishTaskStatus(TaskStatusResource.State.FAILED, "Process didn't exit successfully");
- sendToOutPort("Error");
- return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
-
- } else {
- publishTaskStatus(TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
- sendToOutPort("Error");
- return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
- }
-
- } catch (Exception e) {
-
+ AsyncOperation operation = (AsyncOperation) Class.forName("org.apache.airavata.agents.thrift.operation.ThriftAgentOperation")
+ .getConstructor(ComputeResource.class).newInstance(this.computeResource);
+ operation.executeCommandAsync(this.command, this.callBackWorkflowId);
+ return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
+ } catch (InstantiationException | IllegalAccessException |
+ InvocationTargetException | ClassNotFoundException | NoSuchMethodException | ClassCastException e) {
e.printStackTrace();
- publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
+ publishTaskStatus(TaskStatusResource.State.FAILED, "Failed to load async operation");
return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
}
}
+ @Override
public void onCancel() {
}
@@ -89,7 +73,6 @@ public class CommandTask extends AbstractTask {
public static TaskTypeResource getTaskType() {
TaskTypeResource taskTypeResource = new TaskTypeResource();
taskTypeResource.setName(NAME);
- taskTypeResource.setTopicName("airavata-command");
taskTypeResource.setIcon("assets/icons/ssh.png");
taskTypeResource.getInputTypes().addAll(
Arrays.asList(
@@ -106,6 +89,10 @@ public class CommandTask extends AbstractTask {
.setType("Long")
.setDefaultValue(""),
new TaskInputTypeResource()
+ .setName(PARAMS.CALLBACK_WORKFLOW)
+ .setType("Long")
+ .setDefaultValue(""),
+ new TaskInputTypeResource()
.setName(PARAMS.STD_OUT_PATH)
.setType("String")
.setDefaultValue(""),
@@ -114,6 +101,7 @@ public class CommandTask extends AbstractTask {
.setType("String")
.setDefaultValue("")));
+
taskTypeResource.getOutPorts().addAll(
Arrays.asList(
new TaskOutPortTypeResource()
@@ -133,5 +121,6 @@ public class CommandTask extends AbstractTask {
public static final String STD_OUT_PATH = "std_out_path";
public static final String STD_ERR_PATH = "std_err_path";
public static final String COMPUTE_RESOURCE = "compute_resource";
+ public static final String CALLBACK_WORKFLOW = "callback_workflow";
}
}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
similarity index 84%
copy from airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
copy to airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
index 3b7e55e..57bb26c 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/java/org/apache/airavata/helix/task/async/command/Participant.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.helix.task.command;
+package org.apache.airavata.helix.task.async.command;
import org.apache.airavata.helix.api.HelixParticipant;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
@@ -18,7 +18,6 @@ import java.util.Map;
*/
public class Participant extends HelixParticipant {
-
public Participant(String propertyFile) throws IOException {
super(propertyFile);
}
@@ -30,18 +29,18 @@ public class Participant extends HelixParticipant {
TaskFactory commandTaskFac = new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new CommandTask(context);
+ return new AsyncCommandTask(context, getPropertyResolver());
}
};
- taskRegistry.put(CommandTask.NAME, commandTaskFac);
+ taskRegistry.put(AsyncCommandTask.NAME, commandTaskFac);
return taskRegistry;
}
@Override
public TaskTypeResource getTaskType() {
- return CommandTask.getTaskType();
+ return AsyncCommandTask.getTaskType();
}
public static void main(String args[]) {
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties
new file mode 100644
index 0000000..78ef57d
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/application.properties
@@ -0,0 +1,8 @@
+api.server.url=api-server.default.svc.cluster.local:8080
+zookeeper.connection.url=localhost:2199
+helix.cluster.name=AiravataDemoCluster
+participant.name=async-command-p1
+task.type.name=ASYNC_COMMAND_TASK
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
+async.event.listener.topic=async-event-listener
diff --git a/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5e31e3c
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/async-command-task/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
index 866c2c7..d5b8bda 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
@@ -1,6 +1,7 @@
package org.apache.airavata.helix.task.command;
import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -29,8 +30,8 @@ public class CommandTask extends AbstractTask {
private String computeResourceId;
private ComputeResource computeResource;
- public CommandTask(TaskCallbackContext callbackContext) {
- super(callbackContext);
+ public CommandTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+ super(callbackContext, propertyResolver);
}
@Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
index 3b7e55e..289135d 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
@@ -30,7 +30,7 @@ public class Participant extends HelixParticipant {
TaskFactory commandTaskFac = new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new CommandTask(context);
+ return new CommandTask(context, getPropertyResolver());
}
};
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
index 92292ab..73a9b9c 100644
--- a/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/resources/application.properties
@@ -3,3 +3,5 @@ zookeeper.connection.url=localhost:2199
helix.cluster.name=AiravataDemoCluster
participant.name=command-p2
task.type.name=COMMAND
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
index 00aeadc..cc6af6b 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
@@ -1,6 +1,7 @@
package org.apache.airavata.helix.task.datain;
import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -30,8 +31,8 @@ public class DataInputTask extends AbstractTask {
private String computeResourceId;
private ComputeResource computeResource;
- public DataInputTask(TaskCallbackContext callbackContext) {
- super(callbackContext);
+ public DataInputTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+ super(callbackContext, propertyResolver);
}
@Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
index d804352..498dfee 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
@@ -30,7 +30,7 @@ public class Participant extends HelixParticipant {
TaskFactory dataInTask = new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new DataInputTask(context);
+ return new DataInputTask(context, getPropertyResolver());
}
};
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
index 3f98946..acf5a9b 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/resources/application.properties
@@ -2,4 +2,6 @@ api.server.url=api-server.default.svc.cluster.local:8080
zookeeper.connection.url=localhost:2199
helix.cluster.name=AiravataDemoCluster
participant.name=data-in-p1
-task.type.name=DATA_INPUT
\ No newline at end of file
+task.type.name=DATA_INPUT
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
index ad6123f..0fe8fbe 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/DataOutputTask.java
@@ -1,6 +1,7 @@
package org.apache.airavata.helix.task.dataout;
import org.apache.airavata.helix.api.AbstractTask;
+import org.apache.airavata.helix.api.PropertyResolver;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
@@ -33,8 +34,8 @@ public class DataOutputTask extends AbstractTask {
private String computeResourceId;
private ComputeResource computeResource;
- public DataOutputTask(TaskCallbackContext callbackContext) {
- super(callbackContext);
+ public DataOutputTask(TaskCallbackContext callbackContext, PropertyResolver propertyResolver) {
+ super(callbackContext, propertyResolver);
}
@Override
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
index a26c654..1fe9fe6 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/dataout/Participant.java
@@ -29,7 +29,7 @@ public class Participant extends HelixParticipant {
TaskFactory dataInTask = new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new DataOutputTask(context);
+ return new DataOutputTask(context, getPropertyResolver());
}
};
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
index 37109d6..2e10b26 100644
--- a/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/resources/application.properties
@@ -2,4 +2,6 @@ api.server.url=api-server.default.svc.cluster.local:8080
zookeeper.connection.url=localhost:2199
helix.cluster.name=AiravataDemoCluster
participant.name=data-out-p1
-task.type.name=DATA_OUTPUT
\ No newline at end of file
+task.type.name=DATA_OUTPUT
+kafka.bootstrap.url=localhost:9092
+event.topic=airavata-task-event
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
index 4fe8579..37ca403 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
@@ -1,5 +1,6 @@
package org.apache.airavata.k8s.gfac.core;
+import org.apache.airavata.k8s.api.resources.process.ProcessBootstrapDataResource;
import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
import org.apache.airavata.k8s.api.resources.task.TaskResource;
import org.apache.helix.HelixManagerFactory;
@@ -26,6 +27,7 @@ public class HelixWorkflowManager {
private long processId;
private List<TaskResource> tasks;
+ private List<ProcessBootstrapDataResource> boostrapData;
// out port id, next task id
private Map<Long, Long> edgeMap;
@@ -38,9 +40,9 @@ public class HelixWorkflowManager {
private String helixClusterName;
private String instanceName;
- public HelixWorkflowManager(long processId, List<TaskResource> tasks, Map<Long, Long> edgeMap,
- RestTemplate restTemplate, String apiServerUrl, String zkConnectionString,
- String helixClusterName, String instanceName) {
+ public HelixWorkflowManager(long processId, List<TaskResource> tasks, List<ProcessBootstrapDataResource> boostrapData,
+ Map<Long, Long> edgeMap, RestTemplate restTemplate, String apiServerUrl,
+ String zkConnectionString, String helixClusterName, String instanceName) {
this.processId = processId;
this.tasks = tasks;
this.edgeMap = edgeMap;
@@ -49,6 +51,7 @@ public class HelixWorkflowManager {
this.zkConnectionString = zkConnectionString;
this.helixClusterName = helixClusterName;
this.instanceName = instanceName;
+ this.boostrapData = boostrapData;
}
public void launchWorkflow() {
@@ -57,7 +60,7 @@ public class HelixWorkflowManager {
try {
updateProcessStatus(ProcessStatusResource.State.CREATED);
- Workflow.Builder workflowBuilder = createWorkflow();
+ Workflow.Builder workflowBuilder = createWorkflow(this.boostrapData);
WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0);
workflowBuilder.setWorkflowConfig(config.build());
if (workflowBuilder == null) {
@@ -94,11 +97,11 @@ public class HelixWorkflowManager {
}
}
- private Workflow.Builder createWorkflow() {
+ private Workflow.Builder createWorkflow(List<ProcessBootstrapDataResource> bootstrapData) {
Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
if (startingTask.isPresent()) {
Workflow.Builder workflow = new Workflow.Builder("Airavata_Process_" + processId).setExpiry(0);
- createWorkflowRecursively(startingTask.get(), workflow, null);
+ createWorkflowRecursively(startingTask.get(), workflow, null, bootstrapData);
return workflow;
} else {
System.out.println("No starting task for this process " + processId);
@@ -107,7 +110,8 @@ public class HelixWorkflowManager {
}
}
- private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId) {
+ private void createWorkflowRecursively(TaskResource taskResource, Workflow.Builder workflow, Long parentTaskId,
+ List<ProcessBootstrapDataResource> boostrapData) {
TaskConfig.Builder taskBuilder = new TaskConfig.Builder().setTaskId("Task_" + taskResource.getId())
.setCommand(taskResource.getTaskType().getName());
@@ -119,6 +123,10 @@ public class HelixWorkflowManager {
taskBuilder.addConfig("task_id", taskResource.getId() + "");
taskBuilder.addConfig("process_id", taskResource.getParentProcessId() + "");
+ Optional.ofNullable(boostrapData).ifPresent(data -> {
+ data.forEach(d -> taskBuilder.addConfig(d.getKey(), d.getValue()));
+ });
+
Optional.ofNullable(taskResource.getOutPorts()).ifPresent(outPorts -> outPorts.forEach(outPort -> {
Optional.ofNullable(edgeMap.get(outPort.getId())).ifPresent(nextTask -> {
Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
@@ -148,7 +156,7 @@ public class HelixWorkflowManager {
Optional<TaskResource> nextTaskResource = tasks.stream().filter(task -> task.getId() == nextTask).findFirst();
nextTaskResource.ifPresent(t -> {
- createWorkflowRecursively(t, workflow, taskResource.getId());
+ createWorkflowRecursively(t, workflow, taskResource.getId(), null);
});
});
}));
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 888f469..2ea84de 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -85,7 +85,8 @@ public class WorkerService {
//processLifecycleStore.put(processId, manager);
- final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources, edgeMap,
+ final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId, taskResources,
+ processResource.getProcessBootstrapData(), edgeMap,
restTemplate, apiServerUrl,
zkConnectionString, helixClusterName, instanceName);
diff --git a/airavata-kubernetes/pom.xml b/airavata-kubernetes/pom.xml
index 30b9226..9ca584f 100644
--- a/airavata-kubernetes/pom.xml
+++ b/airavata-kubernetes/pom.xml
@@ -41,6 +41,10 @@
<module>modules/microservices/tasks/command-task</module>
<module>modules/microservices/tasks/data-in-task</module>
<module>modules/microservices/tasks/data-out-task</module>
+ <module>modules/agents/agent-core</module>
+ <module>modules/agents/thrift-agent</module>
+ <module>modules/microservices/tasks/async-command-task</module>
+ <module>modules/microservices/async-event-listener</module>
</modules>
<dependencyManagement>
--
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <co...@airavata.apache.org>.